Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/rust-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
name: Check
runs-on: ubuntu-latest
container:
image: rust:1.91
image: rust:1.92
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup
Expand All @@ -26,7 +26,7 @@ jobs:
name: Build
runs-on: ubuntu-latest
container:
image: rust:1.91
image: rust:1.92
needs: check
steps:
- uses: actions/checkout@v4
Expand All @@ -39,7 +39,7 @@ jobs:
name: Test
runs-on: ubuntu-latest
container:
image: rust:1.91
image: rust:1.92
needs: build
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ homepage = "https://neocraft.tech/"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/neocrafttech/nyas"
rust-version = "1.91.0"
rust-version = "1.92.0"
version = "0.1.0"

[workspace.dependencies]
Expand Down
14 changes: 12 additions & 2 deletions bolt.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -e

RUST_VERSION="1.91.0"
RUST_VERSION="1.92.0"

format() {
cargo +nightly fmt;
Expand Down Expand Up @@ -38,6 +38,12 @@ setup() {
setup_rust
}

clean() {
echo "[INFO] Cleaning workspace..."
cargo clean
echo "[OK] Workspace cleaned!"
}

check() {
echo "[INFO] Running cargo check..."
cargo check
Expand Down Expand Up @@ -71,12 +77,13 @@ bench() {
}

help() {
echo "Usage: $0 [setup|check|build|test|bench|all|help]"
echo "Usage: $0 [setup|check|format|clean|build|test|bench|all|help]"
echo
echo "Commands:"
echo " setup - Install Rust and cargo-nextest"
echo " format - Format the code"
echo " check - Run cargo check, fmt, and clippy"
echo " clean - Clean the workspace"
echo " build - Only build the workspace (runs check first)"
echo " test - Only run tests"
echo " bench - Only run benchmarks"
Expand All @@ -96,6 +103,9 @@ main() {
check)
check
;;
clean)
clean
;;
build)
build
;;
Expand Down
123 changes: 50 additions & 73 deletions nyas/diskann/src/disk_index_storage.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::collections::{BinaryHeap, HashMap};
use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::io::Write;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
use std::sync::Mutex;

use rkyv::api::high::to_bytes_with_alloc;
use rkyv::ser::allocator::Arena;
use rkyv::{Archive, Deserialize, Serialize, from_bytes, rancor};
use system::metric::MetricType;
use system::vector_data::VectorData;
use system::vector_point::VectorPoint;
use tokio::fs::{File, OpenOptions};
use tokio::io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::sync::Mutex;
use tracing::debug;

use crate::in_mem_index::InMemIndex;
Expand Down Expand Up @@ -58,38 +56,36 @@ impl DiskIndexStorage {
Ok(DiskIndexStorage { files: vec![] })
}

pub fn insert(&mut self, mem_index: &InMemIndex) -> Result<(), std::io::Error> {
pub async fn insert(&mut self, mem_index: &InMemIndex) -> io::Result<()> {
let file_count = self.files.len();
let data_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(format!("disk_view_data_{}.bin", file_count))?;
.open(format!("disk_view_data_{}.bin", file_count))
.await?;
let graph_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(format!("disk_view_index_{}.bin", file_count))?;
.open(format!("disk_view_index_{}.bin", file_count))
.await?;
self.files.push(IndexFiles {
data_file: Mutex::new(data_file),
graph_file: Mutex::new(graph_file),
});

self.write_graph(mem_index)?;
self.write_data(mem_index)?;
self.write_graph(mem_index).await?;
self.write_data(mem_index).await?;
Ok(())
}

fn write_graph(&mut self, mem_index: &InMemIndex) -> Result<(), std::io::Error> {
async fn write_graph(&mut self, mem_index: &InMemIndex) -> io::Result<()> {
let last_file =
self.files.last().ok_or_else(|| std::io::Error::other("No files in the list"))?;

let mut file_guard = last_file
.graph_file
.lock()
.map_err(|e| std::io::Error::other(format!("Failed to lock file Mutex: {}", e)))?;

let mut writer = std::io::BufWriter::new(&mut *file_guard);
let mut file_guard = last_file.graph_file.lock().await;
let file: &mut File = &mut file_guard;

let binding =
mem_index.points.iter().next().ok_or_else(|| {
Expand All @@ -114,11 +110,11 @@ impl DiskIndexStorage {
let mut arena = Arena::new();
let meta_bytes = to_bytes_with_alloc::<_, rancor::Error>(&disk_meta, arena.acquire())
.map_err(|e| std::io::Error::other(e.to_string()))?;
writer.write_all(&meta_bytes)?;
file.write_all(&meta_bytes).await?;

let padding_len = BLOCK_SIZE.saturating_sub(meta_bytes.len());
if padding_len > 0 {
writer.write_all(&vec![0u8; padding_len])?;
file.write_all(&vec![0u8; padding_len]).await?;
}
let mut block_buffer = Vec::with_capacity(BLOCK_SIZE);
let mut nodes_in_current_block = 0;
Expand Down Expand Up @@ -151,7 +147,7 @@ impl DiskIndexStorage {
block_buffer.resize(BLOCK_SIZE, 0);
}

writer.write_all(&block_buffer)?;
file.write_all(&block_buffer).await?;

block_buffer.clear();
nodes_in_current_block = 0;
Expand All @@ -161,21 +157,14 @@ impl DiskIndexStorage {
if block_buffer.len() < BLOCK_SIZE {
block_buffer.resize(BLOCK_SIZE, 0);
}
writer.write_all(&block_buffer)?;
file.write_all(&block_buffer).await?;
}
writer.flush()?;
Ok(())
}

fn write_data(&mut self, mem_index: &InMemIndex) -> Result<(), std::io::Error> {
let mut file_guard = self
.files
.last()
.unwrap()
.data_file
.lock()
.map_err(|e| std::io::Error::other(format!("Failed to lock file Mutex: {}", e)))?;
let mut writer = std::io::BufWriter::new(&mut *file_guard);
async fn write_data(&mut self, mem_index: &InMemIndex) -> io::Result<()> {
let mut file_guard = self.files.last().unwrap().data_file.lock().await;
let file: &mut File = &mut file_guard;
if mem_index.points.is_empty() {
return Err(std::io::Error::other("Empty index"));
}
Expand Down Expand Up @@ -210,7 +199,7 @@ impl DiskIndexStorage {
block_buffer.resize(BLOCK_SIZE, 0);
}

writer.write_all(&block_buffer)?;
file.write_all(&block_buffer).await?;

block_buffer.clear();
nodes_in_current_block = 0;
Expand All @@ -220,22 +209,18 @@ impl DiskIndexStorage {
if block_buffer.len() < BLOCK_SIZE {
block_buffer.resize(BLOCK_SIZE, 0);
}
writer.write_all(&block_buffer)?;
file.write_all(&block_buffer).await?;
}
writer.flush()?;
Ok(())
}

pub fn get_nodes(
pub async fn get_nodes(
&self, file: &Mutex<File>, block_offset: u64, total_nodes: u64,
) -> Result<Vec<DiskNode>, std::io::Error> {
let mut nodes = Vec::with_capacity(total_nodes as usize);
let mut block_offset = block_offset;
while nodes.len() < total_nodes as usize {
let file_guard = file
.lock()
.map_err(|e| std::io::Error::other(format!("Failed to lock file Mutex: {}", e)))?;
let disk_bytes = Self::read_bytes(file_guard, block_offset)?;
let disk_bytes = Self::read_bytes(file, block_offset).await?;
let mut offset = 0;
while offset + 8 < BLOCK_SIZE && nodes.len() < total_nodes as usize {
if offset % ALIGNMENT != 0 {
Expand All @@ -262,18 +247,14 @@ impl DiskIndexStorage {
Ok(nodes)
}

pub fn get_points(
pub async fn get_points(
&self, file: &Mutex<File>, block_offset: u64, nodes_per_block: u64, point_size: u64,
total_nodes: u64,
) -> Result<Vec<VectorPoint>, std::io::Error> {
) -> io::Result<Vec<VectorPoint>> {
let mut block_offset = block_offset;
let mut points = Vec::with_capacity(nodes_per_block as usize);
while points.len() < total_nodes as usize {
let file_guard = file
.lock()
.map_err(|e| std::io::Error::other(format!("Failed to lock file Mutex: {}", e)))?;

let disk_bytes = Self::read_bytes(file_guard, block_offset)?;
let disk_bytes = Self::read_bytes(file, block_offset).await?;
let mut offset = 0;
while offset < BLOCK_SIZE && points.len() < total_nodes as usize {
if offset % ALIGNMENT != 0 {
Expand All @@ -292,12 +273,8 @@ impl DiskIndexStorage {
}

#[inline(always)]
fn get_disk_meta(&self, file: &Mutex<File>) -> Result<DiskMeta, std::io::Error> {
let file_guard = file
.lock()
.map_err(|e| std::io::Error::other(format!("Failed to lock file Mutex: {}", e)))?;

let meta_bytes = Self::read_bytes(file_guard, 0)?;
async fn get_disk_meta(&self, file: &Mutex<File>) -> Result<DiskMeta, std::io::Error> {
let meta_bytes = Self::read_bytes(file, 0).await?;

let size = std::mem::size_of::<DiskMeta>();
let meta = from_bytes::<DiskMeta, rancor::Error>(&meta_bytes[0..size])
Expand All @@ -306,30 +283,33 @@ impl DiskIndexStorage {
Ok(meta)
}

pub fn search(
pub async fn search(
&self, query: &VectorData, l: usize, visited_map: &mut HashMap<u64, VectorPoint>,
) {
for file in self.files.iter() {
let disk_meta = self.get_disk_meta(&file.graph_file);
let disk_meta = self.get_disk_meta(&file.graph_file).await;
let Ok(disk_meta) = disk_meta else {
debug!("Disk metadata not found");
continue;
};
debug!("Disk Meta found: {:?}", disk_meta);

let Ok(nodes) = self.get_nodes(&file.graph_file, 1, disk_meta.total_nodes) else {
let Ok(nodes) = self.get_nodes(&file.graph_file, 1, disk_meta.total_nodes).await else {
debug!("Nodes not found");
continue;
};
debug!("Nodes {:?}", nodes.iter().map(|n| n.id).collect::<Vec<_>>());

let Ok(points) = self.get_points(
&file.data_file,
0,
disk_meta.nodes_per_block,
disk_meta.point_size * 2,
disk_meta.total_nodes,
) else {
let Ok(points) = self
.get_points(
&file.data_file,
0,
disk_meta.nodes_per_block,
disk_meta.point_size * 2,
disk_meta.total_nodes,
)
.await
else {
debug!("Points not found in file");
continue;
};
Expand Down Expand Up @@ -404,19 +384,16 @@ impl DiskIndexStorage {
}
}

fn read_bytes(
file_guard: std::sync::MutexGuard<'_, File>, block_offset: u64,
) -> Result<[u8; BLOCK_SIZE], std::io::Error> {
async fn read_bytes(
file: &Mutex<File>, block_offset: u64,
) -> std::io::Result<[u8; BLOCK_SIZE]> {
let mut bytes = [0u8; BLOCK_SIZE];
let offset = block_offset * BLOCK_SIZE as u64;
#[cfg(unix)]
file_guard.read_at(&mut bytes, offset)?;
#[cfg(not(unix))]
{
let mut file = &*file_guard;
file.seek(SeekFrom::Start(offset))?;
file.read_exact(&mut bytes)?;
}

let mut f = file.lock().await;
f.seek(SeekFrom::Start(offset)).await?;
f.read_exact(&mut bytes).await?;

Ok(bytes)
}
}
Loading
Loading