diff --git a/.github/workflows/benches.yml b/.github/workflows/benches.yml index a4e6a877..ba603acb 100644 --- a/.github/workflows/benches.yml +++ b/.github/workflows/benches.yml @@ -4,10 +4,16 @@ run-name: Benchmark on ${{ github.ref_name }} - ${{ github.sha }} on: workflow_dispatch +env: + REDIS_HOST: "redis" + POSTGRES_HOST: "postgres" + jobs: bench: - uses: Cosmian/reusable_workflows/.github/workflows/cargo-bench.yml@develop + uses: Cosmian/reusable_workflows/.github/workflows/cargo-bench.yml@fix/add_hosts_arguments with: - toolchain: stable - features: test-utils,redis-mem,sqlite-mem,rust-mem,postgres-mem + toolchain: 1.87.0 + features: test-utils,redis-mem,sqlite-mem,postgres-mem force: true + redis-host: "redis" + postgres-host: "postgres" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 009d7719..b6f777c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,14 +7,34 @@ jobs: cargo-lint: uses: Cosmian/reusable_workflows/.github/workflows/cargo-nursery.yml@develop with: - toolchain: stable + toolchain: 1.87.0 + workspace: true + cargo-machete: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.87.0 + - name: Install cargo-machete + run: cargo install cargo-machete + - name: Check unused dependencies in findex crate + run: | + cd crate/findex + cargo machete --with-metadata + - name: Check unused dependencies in memories crate + run: | + cd crate/memories + cargo machete --with-metadata cargo-publish: needs: - cargo-lint uses: Cosmian/reusable_workflows/.github/workflows/cargo-publish.yml@develop if: startsWith(github.ref, 'refs/tags/') with: - toolchain: stable + toolchain: 1.87.0 secrets: inherit cleanup: needs: diff --git a/.github/workflows/hack.yml b/.github/workflows/hack.yml index 78db6cb2..c8e6fa9c 100644 --- a/.github/workflows/hack.yml +++ b/.github/workflows/hack.yml @@ -35,7 +35,7 @@ jobs: - uses: actions/checkout@v1 - uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: 1.87.0 override: true - name: Install cargo-hack run: cargo install --locked cargo-hack || true diff --git a/.gitignore b/.gitignore index aad6b0e6..973da758 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ target/ perf* flamegraph.svg **/*.tgz +*.sqlite.db* diff --git a/Cargo.toml b/Cargo.toml index aec0d3d9..8658cf3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,9 @@ -[package] -name = "cosmian_findex" -version = "7.1.0" +[workspace] +members = ["crate/findex", "crate/memories"] +resolver = "2" + +[workspace.package] +version = "8.0.0" authors = [ "Bruno Grieder ", "Célia Corsin ", @@ -15,56 +18,13 @@ keywords = ["SSE"] license = "BUSL-1.1" repository = "https://github.com/Cosmian/findex/" description = "Symmetric Searchable Encryption" +readme = "README.md" -[lib] -name = "cosmian_findex" -path = "src/lib.rs" - -[features] -rust-mem = [] -redis-mem = ["redis"] -sqlite-mem = ["async-sqlite"] -test-utils = ["tokio", "criterion", "futures", "rand", "rand_distr"] -postgres-mem = ["tokio-postgres", "tokio", "deadpool-postgres"] -[dependencies] -aes = "0.8" +[workspace.dependencies] cosmian_crypto_core = { version = "10.1", default-features = false, features = [ "macro", "sha3", ] } -xts-mode = "0.5" - -# Used in benches and tests. -criterion = { version = "0.5", optional = true } -futures = { version = "0.3", optional = true } -rand = { version = "0.9.0", optional = true } -rand_distr = { version = "0.5.1", optional = true } -tokio = { version = "1.44", features = ["rt-multi-thread"], optional = true } - - -# Memory dependencies -async-sqlite = { version = "0.5", optional = true } -deadpool-postgres = { version = "0.14.1", optional = true } -redis = { version = "0.28", features = [ - "aio", - "connection-manager", - "tokio-comp", -], optional = true } -tokio-postgres = { version = "0.7.9", optional = true, features = [ - "array-impls", -] } - - -[dev-dependencies] -futures = { version = "0.3" } -tokio = { version = "1.44", features = ["macros", "rt-multi-thread"] } - -[[bench]] -name = "benches" -harness = false -required-features = ["test-utils"] - -[[example]] -name = "insert" -required-features = ["test-utils"] +criterion = { version = "0.6" } +tokio = { version = "1.45" } diff --git a/README.md b/README.md index 45d055ac..a67ac6e2 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,22 @@ -# Findex +# Findex: Symmetric Searchable Encryption -To build Findex simply run: +[![Crates.io](https://img.shields.io/crates/v/cosmian_findex.svg)](https://crates.io/crates/cosmian_findex) +[![Documentation](https://docs.rs/cosmian_findex/badge.svg)](https://docs.rs/cosmian_findex) +[![License](https://img.shields.io/badge/License-BUSL--1.1-blue.svg)](LICENSE) -```bash -cargo build --release -``` +Findex is a Symmetric Searchable Encryption (SSE) library that enables encrypted search over encrypted data. It allows you to securely index and search encrypted data without compromising privacy or security. -To test, run: +## Architecture -```bash -cargo test --release --all-features -``` +This repository is organized as a Rust workspace with two crates: -To launch the benchmarks, run: +- `cosmian_findex`: Core library implementing the SSE algorithms +- `cosmian_findex_memories`: Storage back-end implementations for different databases -```bash -cargo bench --all-features -``` +## Related Projects -Note that benches are quite involving and require *several hours* for a full -run. Once all benchmarks are run, you will find detailed reports under `target/criterion`. +[Findex Server](github.com/cosmian/findex-server) - A production-ready Findex server implementation + +## License + +This project is licensed under the Business Source License 1.1 (BUSL-1.1). diff --git a/benches/data/concurrent.dat b/benches/data/concurrent.dat deleted file mode 100644 index acd3ed33..00000000 --- a/benches/data/concurrent.dat +++ /dev/null @@ -1,8 +0,0 @@ -1 0 -2 56.00 -3 103.00 -4 155.00 -5 211.00 -6 260.00 -7 307.00 -8 360.00 diff --git a/benches/data/insert.dat b/benches/data/insert.dat deleted file mode 100644 index 7c073cdf..00000000 --- a/benches/data/insert.dat +++ /dev/null @@ -1,16 +0,0 @@ -10 8.33 -16 11.62 -26 17.01 -40 24.88 -64 36.90 -100 57.03 -159 89.30 -252 139.04 -399 222.42 -631 352.70 -1000 545.50 -1585 911.40 -2512 1323.00 -3982 2230.00 -6310 3661.00 -10000 5600.00 diff --git a/benches/data/search.dat b/benches/data/search.dat deleted file mode 100644 index 149a60f9..00000000 --- a/benches/data/search.dat +++ /dev/null @@ -1,16 +0,0 @@ -10 6.96 -16 10.00 -26 15.16 -40 22.45 -64 34.38 -100 52.34 -159 82.67 -252 128.00 -399 234.20 -631 323.40 -1000 513.40 -1585 844.50 -2512 1385.00 -3982 2356.00 -6310 3985.00 -10000 6745.00 diff --git a/benches/make_figures.tex b/benches/make_figures.tex deleted file mode 100644 index cffbd0c0..00000000 --- a/benches/make_figures.tex +++ /dev/null @@ -1,76 +0,0 @@ -\documentclass{article} -\usepackage{pgfplotstable} - -\begin{document} - -\begin{figure} - \centering - \begin{tikzpicture} - \begin{axis}[ - legend pos=north west, - xlabel={\#bindings}, - ylabel={time ($\mu$s)}, - xmin=1, xmax=10000, - grid=both - ] - \addplot[color=blue, mark=x] table {./data/search.dat}; - \addplot[color=red] table [y={create col/linear regression}] {./data/search.dat}; - \addlegendentry{search-time(\#bindings)} - \addlegendentry{ - $ y = - \pgfmathprintnumber{\pgfplotstableregressiona} - \cdot b - \pgfmathprintnumber[print sign]{\pgfplotstableregressionb}$ - } - \end{axis} - \end{tikzpicture} - \begin{tikzpicture} - \begin{axis}[ - legend pos=north west, - xlabel={\#bindings}, - ylabel={time ($\mu$s)}, - xmin=1, xmax=10000, - grid=both - ] - \addplot[color=blue, mark=x] table {./data/insert.dat}; - \addplot[color=red] table [y={create col/linear regression}] {./data/insert.dat}; - \addlegendentry{insertion-time(\#bindings)} - \addlegendentry{ - $ y = - \pgfmathprintnumber{\pgfplotstableregressiona} - \cdot b - \pgfmathprintnumber[print sign]{\pgfplotstableregressionb}$ - } - \end{axis} - \end{tikzpicture} - \caption[multi-binding search]{(Left) Client-side computation time (in $\mu$s) for a single-keyword search, given the number of bound one-word values. (Right) Client-side computation time (in $\mu$s) for a single-keyword insert, given the number of bound one-word values.} - \label{fig:multi-binding-search} - \label{fig:multi-binding-insert} -\end{figure} - -\begin{figure} - \centering - \begin{tikzpicture} - \begin{axis}[ - legend pos=north west, - xlabel={\#clients}, - ylabel={time ($\mu$s)}, - grid=both - ] - \addplot[color=blue, mark=x] table {./data/concurrent.dat}; - \addlegendentry{insertion-time(\#clients)} - \addplot[color=red] table [y={create col/linear regression}] {./data/concurrent.dat}; - \addlegendentry{ - $ y = - \pgfmathprintnumber{\pgfplotstableregressiona} - \cdot c - \pgfmathprintnumber[print sign]{\pgfplotstableregressionb}$ - } - \end{axis} - \end{tikzpicture} - \caption[concurrent insert]{Concurrency overhead (in $\mu$s) for adding 100 bindings on the same keyword, given the number of concurrent clients.} - \label{fig:concurrent-insert} -\end{figure} - - -\end{document} diff --git a/crate/findex/Cargo.toml b/crate/findex/Cargo.toml new file mode 100644 index 00000000..0bfcb25c --- /dev/null +++ b/crate/findex/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "cosmian_findex" +version.workspace = true +authors.workspace = true +categories.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +description.workspace = true +readme = "README.md" + +[lib] +name = "cosmian_findex" +path = "src/lib.rs" + +[features] +test-utils = ["tokio", "criterion"] + +[dependencies] +cosmian_crypto_core.workspace = true +aes = "0.8" +xts-mode = "0.5" + +# Optional dependencies for testing and benchmarking. +tokio = { workspace = true, features = [ + "rt-multi-thread", + "macros", +], optional = true } +criterion = { workspace = true, optional = true } +futures = "0.3.31" + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } + +[[example]] +name = "insert" +required-features = ["test-utils"] diff --git a/crate/findex/README.md b/crate/findex/README.md new file mode 100644 index 00000000..0bd5ec05 --- /dev/null +++ b/crate/findex/README.md @@ -0,0 +1,14 @@ +# Findex + +This crate provides the core functionality of Findex, defining the abstract data types, cryptographic operations, and encoding algorithms. + +## Setup + +Add `cosmian_findex` as dependency to your project : + +```toml +[dependencies] +cosmian_findex = "8.0.0" +``` + +An usage example is available in the [examples folder](./examples). diff --git a/examples/insert.rs b/crate/findex/examples/insert.rs similarity index 93% rename from examples/insert.rs rename to crate/findex/examples/insert.rs index 0a97cadc..d7d4ccd8 100644 --- a/examples/insert.rs +++ b/crate/findex/examples/insert.rs @@ -5,7 +5,6 @@ use cosmian_crypto_core::{ reexport::rand_core::{CryptoRngCore, SeedableRng}, }; use cosmian_findex::{Findex, InMemory, IndexADT, MemoryEncryptionLayer, Op}; -use futures::executor::block_on; use std::collections::{HashMap, HashSet}; /// This function generates a random set of (key, values) couples. Since Findex @@ -85,7 +84,8 @@ fn decoder(words: Vec<[u8; WORD_LENGTH]>) -> Result, String> { Ok(values) } -fn main() { +#[tokio::main] +async fn main() { // For cryptographic applications, it is important to use a secure RNG. In // Rust, those RNG implement the `CryptoRng` trait. let mut rng = CsRng::from_entropy(); @@ -115,18 +115,16 @@ fn main() { // Here we insert all bindings one by one, blocking on each call. A better // way would be to performed all such calls in parallel using tasks. - index - .clone() - .into_iter() - .for_each(|(kw, vs)| block_on(findex.insert(kw, vs)).expect("insert failed")); - + for (kw, vs) in index.clone().into_iter() { + findex.insert(kw, vs).await.expect("insert failed"); + } // In order to verify insertion was correctly performed, we search for all // the indexed keywords... - let res = index - .keys() - .cloned() - .map(|kw| (kw, block_on(findex.search(&kw)).expect("search failed"))) - .collect::>(); + let mut res = HashMap::new(); + for kw in index.keys().cloned() { + let values = findex.search(&kw).await.expect("search failed"); + res.insert(kw, values); + } // ... and verify we get the whole index back! assert_eq!(res, index); diff --git a/src/address.rs b/crate/findex/src/address.rs similarity index 100% rename from src/address.rs rename to crate/findex/src/address.rs diff --git a/src/adt.rs b/crate/findex/src/adt.rs similarity index 68% rename from src/adt.rs rename to crate/findex/src/adt.rs index cc381716..3341400e 100644 --- a/src/adt.rs +++ b/crate/findex/src/adt.rs @@ -33,6 +33,36 @@ pub trait IndexADT { ) -> impl Send + Future>; } +/// BatcherSSEADT (Batched Searchable Symmetric Encryption Abstract Data Type) +/// +/// This trait defines batch operations for encrypted searchable indices. It extends +/// the functionality of the standard `IndexADT` by providing methods that operate on +/// multiple keywords or entries simultaneously to cut network's overhead and improve performance. +pub trait BatcherSSEADT { + // TODO : maybe add the findex functions as trait + // type Findex: IndexADT + Send + Sync; // need those ? + Send + Sync; + // type BatcherMemory: BatchingMemoryADT
; + type Error: Send + Sync + std::error::Error; + + /// Search the index for the values bound to the given keywords. + fn batch_search( + &self, + keywords: Vec<&Keyword>, // n --> n fois barch read --> n+1 + ) -> impl Future>, Self::Error>>; + + /// Adds the given values to the index. + fn batch_insert( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> impl Send + Future>; + + /// Removes the given values from the index. + fn batch_delete( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> impl Send + Future>; +} + pub trait VectorADT: Send + Sync { /// Vectors are homogeneous. type Value: Send + Sync; @@ -76,6 +106,21 @@ pub trait MemoryADT { ) -> impl Send + Future, Self::Error>>; } +// Same as MemoryADT but also batches writes +pub trait BatchingMemoryADT: MemoryADT { + // You only need to declare the NEW methods here + // The associated types and existing methods from MemoryADT are inherited + + /// Writes a batch of guarded write operations with bindings. + fn batch_guarded_write( + &self, + operations: Vec<( + (Self::Address, Option), + Vec<(Self::Address, Self::Word)>, + )>, + ) -> impl Send + Future>, Self::Error>>; +} + #[cfg(test)] pub mod tests { @@ -85,7 +130,6 @@ pub mod tests { //! This module defines tests any implementation of the VectorADT interface must pass. use crate::adt::VectorADT; - use futures::{executor::block_on, future::join_all}; /// Adding information from different copies of the same vector should be visible by all /// copies. @@ -127,10 +171,11 @@ pub mod tests { }) }) .collect::>(); - for h in join_all(handles).await { - h.unwrap(); + for h in handles { + h.await + .expect("Join handle failed during test_vector_concurrent"); } - let mut res = block_on(v.read()).unwrap(); + let mut res = v.read().await.unwrap(); let old = res.clone(); res.sort(); assert_ne!(old, res); diff --git a/crate/findex/src/batcher_findex.rs b/crate/findex/src/batcher_findex.rs new file mode 100644 index 00000000..d1455c2d --- /dev/null +++ b/crate/findex/src/batcher_findex.rs @@ -0,0 +1,386 @@ +use crate::adt::{BatcherSSEADT, BatchingMemoryADT}; +use crate::error::BatchFindexError; +use crate::memory::MemoryBatcher; +use crate::{ADDRESS_LENGTH, Address, Decoder, Encoder, Error, Findex, IndexADT}; +use std::sync::atomic::AtomicUsize; +use std::{collections::HashSet, fmt::Debug, hash::Hash, sync::Arc}; +// TODO : should all of these be sync ? + +#[derive(Debug)] +pub struct BatcherFindex< + const WORD_LENGTH: usize, + Value: Send + Sync + Hash + Eq, + EncodingError: Send + Sync + Debug, + BatcherMemory: Clone + + Send + + Sync + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, +> { + memory: BatcherMemory, + encode: Arc>, + decode: Arc>, +} + +impl< + const WORD_LENGTH: usize, + Value: Send + Sync + Hash + Eq, + BatcherMemory: Debug + + Send + + Sync + + Clone + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, + EncodingError: Send + Sync + Debug + std::error::Error, +> BatcherFindex +{ + pub fn new( + memory: BatcherMemory, + encode: Encoder, + decode: Decoder, + ) -> Self { + Self { + memory, + encode: Arc::new(encode), + decode: Arc::new(decode), + } + } + + // Insert or delete are both an unbounded number of calls to `guarded_write` on the memory layer. + async fn batch_insert_or_delete( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + is_insert: bool, + ) -> Result<(), BatchFindexError> + where + Keyword: Send + Sync + Hash + Eq, + { + let mut search_futures = Vec::new(); + let n = entries.len(); + let buffered_memory = Arc::new(MemoryBatcher::new_writer( + self.memory.clone(), + AtomicUsize::new(n), + )); + + for (guard_keyword, bindings) in entries { + let memory_arc = buffered_memory.clone(); + let future = async move { + // Create a temporary Findex instance using the shared batching layer + let findex: Findex< + WORD_LENGTH, + Value, + EncodingError, + Arc>, + > = Findex::::new( + // this (cheap) Arc cline is necessary because `decrement_capacity` is called + // below and needs to be able to access the Arc + memory_arc.clone(), + *self.encode, + *self.decode, + ); + + let result = if is_insert { + findex.insert(guard_keyword, bindings).await + } else { + findex.delete(guard_keyword, bindings).await + }; + + // Convert Findex error to BatchingLayerError manually if needed + if let Err(findex_err) = result { + return Err(BatchFindexError::FindexError(findex_err)); + } + // once one of the operations succeeds, we should make the buffer smaller + memory_arc.decrement_capacity()?; + Ok(()) + }; + + search_futures.push(future); + } + + // Execute all futures concurrently and collect results + futures::future::try_join_all(search_futures).await?; + + Ok(()) + } +} + +impl< + const WORD_LENGTH: usize, + Keyword: Send + Sync + Hash + Eq, + Value: Send + Sync + Hash + Eq, + EncodingError: Send + Sync + Debug + std::error::Error, + BatcherMemory: Debug + + Send + + Sync + + Clone + + BatchingMemoryADT
, Word = [u8; WORD_LENGTH]>, +> BatcherSSEADT + for BatcherFindex +{ + // type Findex = Findex; + // type BatcherMemory = BatcherMemory; + type Error = BatchFindexError; + + async fn batch_search( + &self, + keywords: Vec<&Keyword>, + ) -> Result>, Self::Error> { + let mut search_futures = Vec::new(); + let n = keywords.len(); + let buffered_memory = Arc::new(MemoryBatcher::new_reader( + self.memory.clone(), + AtomicUsize::new(n), + )); + + for keyword in keywords { + let buffered_memory_clone = buffered_memory.clone(); + let future = async move { + // Create a temporary Findex instance using the shared batching layer + let findex: Findex< + WORD_LENGTH, + Value, + EncodingError, + Arc>, + > = Findex::::new( + buffered_memory_clone, + *self.encode, + *self.decode, + ); + + // Execute the search + findex.search(keyword).await + }; + + search_futures.push(future); + } + // at this point nothing is polled yet + + // Execute all futures concurrently and collect results + let results = futures::future::join_all(search_futures).await; + + // Process results + let mut output = Vec::with_capacity(results.len()); + for result in results { + output.push(result.unwrap()); + } + + Ok(output) + } + + async fn batch_insert( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> Result<(), Self::Error> { + self.batch_insert_or_delete(entries, true).await + } + + async fn batch_delete( + &self, + entries: Vec<(Keyword, impl Sync + Send + IntoIterator)>, + ) -> Result<(), Self::Error> { + self.batch_insert_or_delete(entries, false).await + } +} + +// // The underlying tests assume the existence of a `Findex` implementation that is correct +// // The testing strategy for each function is to use the `Findex` implementation to perform the same operations +// // and compare the results with the `BatcherFindex` implementation. +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::{ +// ADDRESS_LENGTH, Error, Findex, InMemory, IndexADT, address::Address, dummy_decode, +// dummy_encode, +// }; +// use cosmian_crypto_core::define_byte_type; +// use std::collections::HashSet; + +// type Value = Bytes<8>; +// define_byte_type!(Bytes); + +// impl TryFrom for Bytes { +// type Error = String; +// fn try_from(value: usize) -> Result { +// Self::try_from(value.to_be_bytes().as_slice()).map_err(|e| e.to_string()) +// } +// } + +// const WORD_LENGTH: usize = 16; + +// #[tokio::test] +// async fn test_batch_insert() { +// let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + +// let batcher_findex = BatcherFindex::::new( +// trivial_memory.clone(), +// |op, values| { +// dummy_encode::(op, values) +// .map_err(|e| Error::>::BatchedOperationError(e)) +// }, +// |words| { +// dummy_decode(words) +// .map_err(|e| Error::>::BatchedOperationError(e)) +// }, +// ); + +// let cat_bindings = vec![ +// Value::try_from(1).unwrap(), +// Value::try_from(2).unwrap(), +// Value::try_from(3).unwrap(), +// ]; +// let dog_bindings = vec![ +// Value::try_from(4).unwrap(), +// Value::try_from(5).unwrap(), +// Value::try_from(6).unwrap(), +// ]; + +// // Batch insert multiple entries +// let entries = vec![ +// ("cat".to_string(), cat_bindings.clone()), +// ("dog".to_string(), dog_bindings.clone()), +// ]; + +// batcher_findex.batch_insert(entries).await.unwrap(); + +// // instantiate a (non batched) Findex to verify the results +// let findex = Findex::new( +// trivial_memory.clone(), +// dummy_encode::, +// dummy_decode, +// ); + +// let cat_result = findex.search(&"cat".to_string()).await.unwrap(); +// assert_eq!(cat_result, cat_bindings.into_iter().collect::>()); + +// let dog_result = findex.search(&"dog".to_string()).await.unwrap(); +// assert_eq!(dog_result, dog_bindings.into_iter().collect::>()); +// } + +// #[tokio::test] +// async fn test_batch_delete() { +// let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + +// // First, populate the memory with initial data using regular Findex +// let findex = Findex::new( +// trivial_memory.clone(), +// dummy_encode::, +// dummy_decode, +// ); + +// let cat_bindings = vec![ +// Value::try_from(1).unwrap(), +// Value::try_from(3).unwrap(), +// Value::try_from(5).unwrap(), +// Value::try_from(7).unwrap(), +// ]; +// let dog_bindings = vec![ +// Value::try_from(0).unwrap(), +// Value::try_from(2).unwrap(), +// Value::try_from(4).unwrap(), +// Value::try_from(6).unwrap(), +// ]; + +// findex +// .insert("cat".to_string(), cat_bindings.clone()) +// .await +// .unwrap(); +// findex +// .insert("dog".to_string(), dog_bindings.clone()) +// .await +// .unwrap(); + +// // Create BatcherFindex for deletion operations +// let batcher_findex = BatcherFindex::::new( +// trivial_memory.clone(), +// |op, values| { +// dummy_encode::(op, values) +// .map_err(|e| Error::>::BatchedOperationError(e)) +// }, +// |words| { +// dummy_decode(words) +// .map_err(|e| Error::>::BatchedOperationError(e)) +// }, +// ); + +// let delete_entries = vec![ +// ( +// "cat".to_string(), +// vec![Value::try_from(1).unwrap(), Value::try_from(5).unwrap()], +// ), +// ("dog".to_string(), dog_bindings), // Remove all dog bindings +// ]; + +// // Perform batch delete +// batcher_findex.batch_delete(delete_entries).await.unwrap(); + +// // Verify deletions were performed using a regular findex instance +// let cat_result = findex.search(&"cat".to_string()).await.unwrap(); +// let dog_result = findex.search(&"dog".to_string()).await.unwrap(); + +// let expected_cat = vec![ +// Value::try_from(3).unwrap(), // 1 and 5 removed, 3 and 7 remain +// Value::try_from(7).unwrap(), +// ] +// .into_iter() +// .collect::>(); +// let expected_dog = HashSet::new(); // all of the dog bindings are removed + +// assert_eq!(cat_result, expected_cat); +// assert_eq!(dog_result, expected_dog); +// } + +// #[tokio::test] +// async fn test_batch_search() { +// let trivial_memory = InMemory::, [u8; WORD_LENGTH]>::default(); + +// let findex = Findex::new( +// trivial_memory.clone(), +// dummy_encode::, +// dummy_decode, +// ); +// let cat_bindings = [ +// Value::try_from(1).unwrap(), +// Value::try_from(3).unwrap(), +// Value::try_from(5).unwrap(), +// ]; +// let dog_bindings = [ +// Value::try_from(0).unwrap(), +// Value::try_from(2).unwrap(), +// Value::try_from(4).unwrap(), +// ]; +// findex +// .insert("cat".to_string(), cat_bindings.clone()) +// .await +// .unwrap(); +// findex +// .insert("dog".to_string(), dog_bindings.clone()) +// .await +// .unwrap(); + +// let batcher_findex = BatcherFindex::::new( +// trivial_memory, +// |op, values| { +// dummy_encode::(op, values) +// .map_err(|e| Error::>::BatchedOperationError(e)) +// }, +// |words| { +// dummy_decode(words) +// .map_err(|e| Error::>::BatchedOperationError(e)) +// }, +// ); + +// let key1 = "cat".to_string(); +// let key2 = "dog".to_string(); +// // Perform batch search +// let batch_search_results = batcher_findex +// .batch_search(vec![&key1, &key2]) +// .await +// .unwrap(); + +// assert_eq!( +// batch_search_results, +// vec![ +// cat_bindings.iter().cloned().collect::>(), +// dog_bindings.iter().cloned().collect::>() +// ] +// ); +// } +// } diff --git a/src/encoding.rs b/crate/findex/src/encoding.rs similarity index 100% rename from src/encoding.rs rename to crate/findex/src/encoding.rs diff --git a/crate/findex/src/error.rs b/crate/findex/src/error.rs new file mode 100644 index 00000000..38f672b2 --- /dev/null +++ b/crate/findex/src/error.rs @@ -0,0 +1,55 @@ +use std::fmt::{Debug, Display}; + +use crate::{MemoryADT, memory::BatchingLayerError}; + +#[derive(Debug)] +pub enum BatchFindexError { + BatchingLayerError(BatchingLayerError), + FindexError(Error), +} + +impl Display for BatchFindexError +where + ::Address: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BatchFindexError::BatchingLayerError(e) => write!(f, "Batching layer error: {e}"), + BatchFindexError::FindexError(error) => write!(f, "Findex error: {error:?}"), + } + } +} + +impl From> for BatchFindexError { + fn from(e: Error) -> Self { + BatchFindexError::FindexError(e) + } +} + +impl From> for BatchFindexError { + fn from(e: BatchingLayerError) -> Self { + BatchFindexError::BatchingLayerError(e) + } +} + +impl std::error::Error for BatchFindexError where + ::Address: Debug +{ +} + +#[derive(Debug)] +pub enum Error
{ + Parsing(String), + Memory(String), + Conversion(String), + MissingValue(Address, usize), + CorruptedMemoryCache, +} + +impl Display for Error
{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +impl std::error::Error for Error
{} diff --git a/src/findex.rs b/crate/findex/src/findex.rs similarity index 89% rename from src/findex.rs rename to crate/findex/src/findex.rs index 418ccfe5..2db627b4 100644 --- a/src/findex.rs +++ b/crate/findex/src/findex.rs @@ -141,11 +141,10 @@ mod tests { memory::MemoryEncryptionLayer, }; use cosmian_crypto_core::{CsRng, Secret, define_byte_type, reexport::rand_core::SeedableRng}; - use futures::executor::block_on; use std::collections::HashSet; - #[test] - fn test_insert_search_delete_search() { + #[tokio::test] + async fn test_insert_search_delete_search() { // Define a byte type, and use `Value` as an alias for 8-bytes values of // that type. type Value = Bytes<8>; @@ -167,6 +166,7 @@ mod tests { &seed, InMemory::, [u8; WORD_LENGTH]>::default(), ); + let findex = Findex::new(memory, dummy_encode::, dummy_decode); let cat_bindings = [ Value::try_from(1).unwrap(), @@ -178,10 +178,16 @@ mod tests { Value::try_from(2).unwrap(), Value::try_from(4).unwrap(), ]; - block_on(findex.insert("cat".to_string(), cat_bindings.clone())).unwrap(); - block_on(findex.insert("dog".to_string(), dog_bindings.clone())).unwrap(); - let cat_res = block_on(findex.search(&"cat".to_string())).unwrap(); - let dog_res = block_on(findex.search(&"dog".to_string())).unwrap(); + findex + .insert("cat".to_string(), cat_bindings.clone()) + .await + .unwrap(); + findex + .insert("dog".to_string(), dog_bindings.clone()) + .await + .unwrap(); + let cat_res = findex.search(&"cat".to_string()).await.unwrap(); + let dog_res = findex.search(&"dog".to_string()).await.unwrap(); assert_eq!( cat_bindings.iter().cloned().collect::>(), cat_res @@ -191,10 +197,10 @@ mod tests { dog_res ); - block_on(findex.delete("dog", dog_bindings)).unwrap(); - block_on(findex.delete("cat", cat_bindings)).unwrap(); - let cat_res = block_on(findex.search(&"cat".to_string())).unwrap(); - let dog_res = block_on(findex.search(&"dog".to_string())).unwrap(); + findex.delete("dog", dog_bindings).await.unwrap(); + findex.delete("cat", cat_bindings).await.unwrap(); + let cat_res = findex.search(&"cat".to_string()).await.unwrap(); + let dog_res = findex.search(&"dog".to_string()).await.unwrap(); assert_eq!(HashSet::new(), cat_res); assert_eq!(HashSet::new(), dog_res); } diff --git a/src/lib.rs b/crate/findex/src/lib.rs similarity index 83% rename from src/lib.rs rename to crate/findex/src/lib.rs index b5dc2689..84c63895 100644 --- a/src/lib.rs +++ b/crate/findex/src/lib.rs @@ -12,6 +12,7 @@ mod error; mod findex; mod memory; mod ovec; + #[cfg(any(test, feature = "test-utils"))] mod test_utils; @@ -29,15 +30,6 @@ pub use memory::{KEY_LENGTH, MemoryEncryptionLayer}; #[cfg(any(test, feature = "test-utils"))] pub use test_utils::*; -#[cfg(feature = "redis-mem")] -pub use memory::{RedisMemory, RedisMemoryError}; - -#[cfg(feature = "sqlite-mem")] -pub use memory::{SqliteMemory, SqliteMemoryError}; - -#[cfg(feature = "postgres-mem")] -pub use memory::{PostgresMemory, PostgresMemoryError}; - #[cfg(any(test, feature = "test-utils"))] pub use encoding::{ dummy_encoding::{WORD_LENGTH, dummy_decode, dummy_encode}, @@ -50,3 +42,6 @@ pub use memory::InMemory; /// 16-byte addresses ensure a high collision resistance that poses virtually no /// limitation on the index. pub const ADDRESS_LENGTH: usize = 16; + +// TODO: clean this later +mod batcher_findex; diff --git a/crate/findex/src/memory/batching_layer.rs b/crate/findex/src/memory/batching_layer.rs new file mode 100644 index 00000000..306bd345 --- /dev/null +++ b/crate/findex/src/memory/batching_layer.rs @@ -0,0 +1,333 @@ +// ---------------------------------- BufferedMemory Structure ---------------------------------- +// It takes as inner memory any memory that implements the batcher ADT +// which is basically, having MemoryADT + The function batch_guarded_write + +use crate::{MemoryADT, adt::BatchingMemoryADT}; +use futures::channel::oneshot::{self, Canceled}; +use std::fmt::{Debug, format}; +use std::{ + fmt::Display, + marker::PhantomData, + sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + }, +}; +enum PendingOperations +where + M::Address: Clone, +{ + PendingReads( + Mutex< + Vec<( + Vec, + oneshot::Sender>, M::Error>>, + )>, + >, + ), + PendingWrites( + Mutex< + Vec<( + ((M::Address, Option), Vec<(M::Address, M::Word)>), + oneshot::Sender, M::Error>>, + )>, + >, + ), +} + +pub struct MemoryBatcher +where + M::Address: Clone, +{ + inner: M, // the actual memory layer that implements the actual network / memory call + capacity: AtomicUsize, // capacity at which the operation should be executed + buffer: PendingOperations, +} + +#[derive(Debug)] +pub enum BatchingLayerError +where + M::Error: Debug, +{ + MemoryError(M::Error), + MutexError(String), + ChannelError(String), + _Phantom(PhantomData), +} + +impl Display for BatchingLayerError +where + M::Error: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BatchingLayerError::MemoryError(err) => write!(f, "Memory error: {:?}", err), + BatchingLayerError::MutexError(msg) => write!(f, "Mutex error: {}", msg), + BatchingLayerError::ChannelError(_) => { + write!(f, "Channel closed unexpectedly.") + } + BatchingLayerError::_Phantom(_) => panic!("This variant should never be constructed"), + } + } +} +impl From for BatchingLayerError +where + M::Error: Debug, +{ + fn from(_: Canceled) -> Self { + BatchingLayerError::ChannelError( + "The sender was dropped before sending its results with the `send` function." + .to_string(), + ) + } +} + +impl From> for BatchingLayerError +where + M::Error: Debug, +{ + fn from(e: std::sync::PoisonError) -> Self { + BatchingLayerError::MutexError(format!("Mutex lock poisoned: {e}")) + } +} + +impl std::error::Error for BatchingLayerError +where + M: Debug, + M::Error: Debug, +{ +} + +impl MemoryBatcher +where + ::Address: Clone, +{ + pub fn new_reader(inner: M, capacity: AtomicUsize) -> Self { + Self { + inner, + capacity, + buffer: PendingOperations::PendingReads(Mutex::new(Vec::new())), + } + } + + pub fn new_writer(inner: M, capacity: AtomicUsize) -> Self { + Self { + inner, + capacity, + buffer: PendingOperations::PendingWrites(Mutex::new(Vec::new())), + } + } + + // atomically decrement the buffer size, needed on inserts/deletes + pub(crate) fn decrement_capacity(&self) -> Result<(), BatchingLayerError> { + // `fetch_sub` returns the previous value, so if it was 1, it means the buffer's job is done + let previous = self.capacity.fetch_sub(1, Ordering::SeqCst); + match &self.buffer { + PendingOperations::PendingReads(read_ops) => { + if previous <= read_ops.lock()?.len() { + let _ = self.flush(); + } + } + PendingOperations::PendingWrites(write_ops) => { + if previous <= write_ops.lock()?.len() { + let _ = self.flush(); + } + } + } + Ok(()) + } + + async fn flush(&self) -> Result<(), BatchingLayerError> { + match &self.buffer { + PendingOperations::PendingReads(read_ops) => { + // maybe add a check that the capacities are correct + let batches: Vec<( + Vec, + oneshot::Sender>, M::Error>>, + )> = { + let mut pending = read_ops.lock().unwrap(); + if pending.is_empty() { + return Ok(()); + } + std::mem::take(&mut *pending) + }; + + // Build combined address list while tracking which addresses belong to which batch + let all_addresses: Vec<_> = batches + .iter() + .flat_map(|(addresses, _)| addresses.iter()) + .cloned() + .collect(); + + let mut all_results = self + .inner + .batch_read(all_addresses) + .await + // Implementing the adequate from trait for this error seems impossible due to + // conflicting implementation in crate `core` + .map_err(BatchingLayerError::::MemoryError)?; + + // Distribute results to each batch's sender + for (input, sender) in batches.into_iter().rev() { + let split_point = all_results.len() - input.len(); + // After this call, all_results will be left containing the elements [0, split_point) + // that's why we need to reverse the batches + let batch_results = all_results.split_off(split_point); + sender.send(Ok(batch_results)).map_err(|_| { + BatchingLayerError::::ChannelError( + "The receiver end of this read operation was dropped before the `send` function could be called." + .to_owned(), + ) + })?; + } + } + PendingOperations::PendingWrites(write_ops) => { + // maybe add a check that the capacities are correct + let batches = { + let mut pending = write_ops.lock().unwrap(); + if pending.is_empty() { + return Ok(()); + } + std::mem::take(&mut *pending) + }; + + let (bindings, senders): (Vec<_>, Vec<_>) = batches.into_iter().unzip(); + + let res = self + .inner + .batch_guarded_write(bindings) + .await + .map_err(BatchingLayerError::::MemoryError)?; + // Distribute results to each batch's sender + for (res, sender) in res.into_iter().zip(senders) { + sender.send(Ok(res)).map_err(|_| { + BatchingLayerError::::ChannelError( + "The receiver end of this write operation was dropped before the `send` function could be called." + .to_owned(), + ) + })?; + } + } + }; + Ok(()) + } +} + +impl MemoryADT for MemoryBatcher +where + M::Address: Clone + Send, + M::Word: Send, +{ + type Address = M::Address; + type Word = M::Word; + type Error = BatchingLayerError; + + async fn batch_read( + &self, + addresses: Vec, + ) -> Result>, Self::Error> { + match &self.buffer { + PendingOperations::PendingWrites(_) => panic!( + "`batch_read` is called on a writer MemoryBatcher, make sure to use `new_reader` during initialization." + ), + PendingOperations::PendingReads(read_ops) => { + // Create a channel for this batch + let (sender, receiver) = oneshot::channel(); + let should_flush; + + // Add to pending batches + { + let mut pending = read_ops.lock().unwrap(); + pending.push((addresses, sender)); + + // Determine if we should flush + + should_flush = pending.len() == self.capacity.load(Ordering::SeqCst); + if pending.len() > self.capacity.load(Ordering::SeqCst) { + panic!( + "this isn't supposed to happen, by design, change this to an error case later" + ) + } + } + // Flush if buffer is full + // only 1 thread will have this equal to true + if should_flush { + self.flush().await?; + } + + // Wait for results + receiver + .await? + .map_err(|e| BatchingLayerError::::MemoryError(e)) + } + } + } + + async fn guarded_write( + &self, + guard: (Self::Address, Option), + bindings: Vec<(Self::Address, Self::Word)>, + ) -> Result, Self::Error> { + match &self.buffer { + PendingOperations::PendingReads(_) => panic!("what's happenning ?"), + PendingOperations::PendingWrites(write_ops) => { + let (sender, receiver) = oneshot::channel(); + let should_flush; + + // Add to pending batches + { + let mut pending = write_ops.lock().unwrap(); + pending.push(((guard, bindings), sender)); + + let capacity = self.capacity.load(Ordering::SeqCst); + if pending.len() > capacity { + // TODO: determin if this should be kept + panic!( + "this isn't supposed to happen, by design, change this to an error case later" + ) + } + should_flush = pending.len() == capacity; + } + + // Flush if buffer is full + // only caller thread will have this equal to true + if should_flush { + self.flush().await?; + } + + // Wait for results + // Wait for results + receiver + .await? + .map_err(|e| BatchingLayerError::::MemoryError(e)) + } + } + } +} + +// This simply forwards the BR/GW calls to the inner memory +// when findex instances (below) call the batcher's operations +impl MemoryADT for Arc> +where + M::Address: Send + Clone, + M::Word: Send, +{ + type Address = M::Address; + type Word = M::Word; + type Error = BatchingLayerError; + + async fn batch_read( + &self, + addresses: Vec, + ) -> Result>, Self::Error> { + (**self).batch_read(addresses).await + } + + async fn guarded_write( + &self, + guard: (Self::Address, Option), + bindings: Vec<(Self::Address, Self::Word)>, + ) -> Result, Self::Error> { + (**self).guarded_write(guard, bindings).await + } +} diff --git a/src/memory/encryption_layer.rs b/crate/findex/src/memory/encryption_layer.rs similarity index 98% rename from src/memory/encryption_layer.rs rename to crate/findex/src/memory/encryption_layer.rs index 57254773..c48025ae 100644 --- a/src/memory/encryption_layer.rs +++ b/crate/findex/src/memory/encryption_layer.rs @@ -140,7 +140,7 @@ mod tests { use crate::{ ADDRESS_LENGTH, address::Address, - memory::{MemoryEncryptionLayer, in_memory_store::InMemory}, + memory::{MemoryEncryptionLayer, in_memory::InMemory}, test_utils::{ gen_seed, test_guarded_write_concurrent, test_single_write_and_read, test_wrong_guard, }, diff --git a/src/memory/in_memory_store.rs b/crate/findex/src/memory/in_memory.rs similarity index 70% rename from src/memory/in_memory_store.rs rename to crate/findex/src/memory/in_memory.rs index 78ca9b9b..324b83ff 100644 --- a/src/memory/in_memory_store.rs +++ b/crate/findex/src/memory/in_memory.rs @@ -1,3 +1,5 @@ +//! A thread-safe implementation of the `MemoryADT` trait based on a `HashMap`. + use std::{ collections::HashMap, fmt::{Debug, Display}, @@ -5,7 +7,7 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::MemoryADT; +use crate::{MemoryADT, adt::BatchingMemoryADT}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct MemoryError; @@ -74,6 +76,29 @@ impl + BatchingMemoryADT for InMemory +{ + async fn batch_guarded_write( + &self, + operations: Vec<((Address, Option), Vec<(Address, Value)>)>, + ) -> Result>, Self::Error> { + let store = &mut *self.inner.lock().expect("poisoned lock"); + let mut res = Vec::with_capacity(operations.len()); + for (guard, bindings) in operations { + let (a, old) = guard; + let cur = store.get(&a).cloned(); + if old == cur { + for (k, v) in bindings { + store.insert(k, v); + } + } + res.push(cur); + } + Ok(res) + } +} + impl IntoIterator for InMemory { @@ -96,7 +121,10 @@ mod tests { use super::InMemory; use crate::{ test_utils::gen_seed, - test_utils::{test_guarded_write_concurrent, test_single_write_and_read, test_wrong_guard}, + test_utils::{ + test_guarded_write_concurrent, test_rw_same_address, test_single_write_and_read, + test_wrong_guard, + }, }; #[tokio::test] @@ -111,6 +139,12 @@ mod tests { test_wrong_guard(&memory, gen_seed()).await; } + #[tokio::test] + async fn test_sequential_rw_same_address() { + let memory = InMemory::<[u8; 16], [u8; 16]>::default(); + test_rw_same_address(&memory, gen_seed()).await; + } + #[tokio::test] async fn test_concurrent_read_write() { let memory = InMemory::<[u8; 16], [u8; 16]>::default(); diff --git a/crate/findex/src/memory/mod.rs b/crate/findex/src/memory/mod.rs new file mode 100644 index 00000000..f9900d29 --- /dev/null +++ b/crate/findex/src/memory/mod.rs @@ -0,0 +1,10 @@ +mod encryption_layer; +pub use encryption_layer::{KEY_LENGTH, MemoryEncryptionLayer}; + +#[cfg(any(test, feature = "test-utils"))] +mod in_memory; +#[cfg(any(test, feature = "test-utils"))] +pub use in_memory::InMemory; + +pub mod batching_layer; +pub use batching_layer::*; diff --git a/src/ovec.rs b/crate/findex/src/ovec.rs similarity index 100% rename from src/ovec.rs rename to crate/findex/src/ovec.rs diff --git a/src/test_utils/benches.rs b/crate/findex/src/test_utils/benches.rs similarity index 91% rename from src/test_utils/benches.rs rename to crate/findex/src/test_utils/benches.rs index c5b01aa7..7c039077 100644 --- a/src/test_utils/benches.rs +++ b/crate/findex/src/test_utils/benches.rs @@ -1,10 +1,14 @@ +//! This module provides a comprehensive benchmarking suite for testing the +//! performance of Findex memory implementations. These benchmarks are designed +//! to be generic and work with any memory back end that implements the MemoryADT +//! trait. + use crate::{ ADDRESS_LENGTH, Address, Findex, IndexADT, MemoryADT, MemoryEncryptionLayer, WORD_LENGTH, dummy_decode, dummy_encode, }; use cosmian_crypto_core::{Secret, reexport::rand_core::CryptoRngCore}; use criterion::{BenchmarkId, Criterion}; -use futures::future::join_all; use std::{collections::HashSet, fmt::Debug, sync::Arc}; use tokio::runtime::{Builder, Runtime}; @@ -57,9 +61,6 @@ pub fn bench_memory_search_multiple_bindings< c: &mut Criterion, rng: &mut impl CryptoRngCore, ) { - // Redis memory backend requires a tokio runtime, and all operations to - // happen in the same runtime, otherwise the connection returns a broken - // pipe error. let rt = Runtime::new().unwrap(); let findex = Findex::new( @@ -98,9 +99,6 @@ pub fn bench_memory_search_multiple_keywords< c: &mut Criterion, rng: &mut impl CryptoRngCore, ) { - // Redis memory backend requires a tokio runtime, and all operations to - // happen in the same runtime, otherwise the connection returns a broken - // pipe error. let rt = Runtime::new().unwrap(); let findex = Arc::new(Findex::new( @@ -139,8 +137,8 @@ pub fn bench_memory_search_multiple_keywords< let findex = findex.clone(); handles.push(tokio::spawn(async move { findex.search(&kw).await })) } - for res in join_all(handles).await { - res.unwrap().unwrap(); + for res in handles { + res.await.expect("Search task failed").unwrap(); } }) }, @@ -167,9 +165,6 @@ pub fn bench_memory_insert_multiple_bindings< clear: impl AsyncFn(&Memory) -> Result<(), E>, rng: &mut impl CryptoRngCore, ) { - // Redis memory backend requires a tokio runtime, and all operations to - // happen in the same runtime, otherwise the connection returns a broken - // pipe error. let rt = Runtime::new().unwrap(); let mut m = rt.block_on(m()); @@ -222,9 +217,6 @@ pub fn bench_memory_contention< ) { const N_CLIENTS: usize = 10; - // Redis memory backend requires a tokio runtime, and all operations to - // happen in the same runtime, otherwise the connection returns a broken - // pipe error. let rt = Builder::new_multi_thread().enable_all().build().unwrap(); let m = rt.block_on(m()); @@ -256,10 +248,14 @@ pub fn bench_memory_contention< }, |iterator| { rt.block_on(async { - join_all(iterator.map(|(findex, (kw, vs))| { - tokio::spawn(async move { findex.insert(kw, vs).await }) - })) - .await + let handles = iterator + .map(|(findex, (kw, vs))| { + tokio::spawn(async move { findex.insert(kw, vs).await }) + }) + .collect::>(); + for h in handles { + h.await.expect("Insert task failed").unwrap(); + } }) }, criterion::BatchSize::SmallInput, @@ -297,7 +293,9 @@ pub fn bench_memory_contention< let h = tokio::spawn(async move { findex.insert(kw, vs).await }); handles.push(h); } - join_all(handles).await + for h in handles { + h.await.expect("Insert task failed").unwrap(); + } }) }, criterion::BatchSize::SmallInput, @@ -326,9 +324,6 @@ pub fn bench_memory_one_to_many< ) { const MAX_VAL: usize = 100; - // Redis memory backend requires a tokio runtime, and all operations to - // happen in the same runtime, otherwise the connection returns a broken - // pipe error. let rt = Builder::new_multi_thread().enable_all().build().unwrap(); let m = rt.block_on(m()); diff --git a/src/test_utils/memory_tests.rs b/crate/findex/src/test_utils/memory_tests.rs similarity index 100% rename from src/test_utils/memory_tests.rs rename to crate/findex/src/test_utils/memory_tests.rs diff --git a/src/test_utils/mod.rs b/crate/findex/src/test_utils/mod.rs similarity index 100% rename from src/test_utils/mod.rs rename to crate/findex/src/test_utils/mod.rs diff --git a/crate/memories/Cargo.toml b/crate/memories/Cargo.toml new file mode 100644 index 00000000..a7dbf7a6 --- /dev/null +++ b/crate/memories/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "cosmian_findex_memories" +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true +categories.workspace = true +keywords.workspace = true +repository.workspace = true +description.workspace = true +readme = "README.md" + +[lib] +name = "cosmian_findex_memories" +path = "src/lib.rs" + +[features] +redis-mem = ["redis"] +sqlite-mem = ["async-sqlite"] +postgres-mem = ["deadpool-postgres", "tokio", "tokio-postgres"] +test-utils = ["cosmian_findex/test-utils"] + +[dependencies] +async-sqlite = { version = "0.5", optional = true } +cosmian_findex = { path = "../findex", version = "8.0" } +deadpool-postgres = { version = "0.14", optional = true } +redis = { version = "0.32", default-features = false, features = [ + "connection-manager", + "tokio-comp", +], optional = true } +tokio = { workspace = true, features = ["rt-multi-thread"], optional = true } +tokio-postgres = { version = "0.7", optional = true, features = [ + "array-impls", +] } + + +[dev-dependencies] +cosmian_crypto_core.workspace = true +cosmian_findex = { path = "../findex", version = "8.0", features = [ + "test-utils", +] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +criterion = { workspace = true } +futures = { version = "0.3" } +rand = "0.9" +rand_distr = "0.5" + +[[bench]] +name = "benches" +harness = false +required-features = ["redis-mem", "sqlite-mem", "postgres-mem", "test-utils"] + +[[example]] +name = "example" +required-features = ["redis-mem", "sqlite-mem", "postgres-mem"] diff --git a/crate/memories/README.md b/crate/memories/README.md new file mode 100644 index 00000000..a35af3bf --- /dev/null +++ b/crate/memories/README.md @@ -0,0 +1,36 @@ +# Findex Memories + +A collection of memory implementations for Findex, a concurrent and database-agnostic Searchable Encryption scheme. +Findex memories provide compatibility with concrete databases, allowing the core Findex library to remain database-agnostic. This separation enables users to integrate Findex with their preferred database system. + +## Setup + +First, add `cosmian_findex_memories` as dependency to your project : + +```bash +cargo add cosmian_findex_memories # do not forget to enable the adequate feature for the back end you want to use ! +``` + +If you don't have a running `Redis` or `Postgres` instance running, you can use the [`docker-compose.yml`](./docker-compose.yml) file provided with this repository by running `docker-compose up`. + +For detailed usage examples, refer to the [examples folder](examples). + +## Available Storage Back-ends + +This library provides implementations for the following storage systems: + +| Feature | Database | Dependencies | +| -------------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `redis-mem` | Redis | [redis](https://crates.io/crates/redis) v0.31 | +| `sqlite-mem` | SQLite | [async-sqlite](https://crates.io/crates/async-sqlite) v0.5 | +| `postgres-mem` | PostgreSQL | [tokio-postgres](https://crates.io/crates/tokio-postgres) v0.7.9
[tokio](https://crates.io/crates/tokio) v1.44
[deadpool-postgres](https://crates.io/crates/deadpool-postgres) v0.14.1 | + +To execute the PostgreSQL tests and run the benches locally with your postgres installation, the easiest way would be to add the following service to your pg_service.conf file (usually under `~/.pg_service.conf`): + +```toml +[cosmian_service] +host=localhost +dbname=cosmian +user=cosmian +password=cosmian +``` diff --git a/benches/benches.rs b/crate/memories/benches/benches.rs similarity index 75% rename from benches/benches.rs rename to crate/memories/benches/benches.rs index 22d5ee16..fe1f15d6 100644 --- a/benches/benches.rs +++ b/crate/memories/benches/benches.rs @@ -2,23 +2,26 @@ // activated. #![allow(unused_imports, unused_variables, unused_mut, dead_code)] -use cosmian_crypto_core::{CsRng, reexport::rand_core::SeedableRng}; +use cosmian_crypto_core::{ + CsRng, + reexport::rand_core::{RngCore, SeedableRng}, +}; use cosmian_findex::{ bench_memory_contention, bench_memory_insert_multiple_bindings, bench_memory_one_to_many, bench_memory_search_multiple_bindings, bench_memory_search_multiple_keywords, }; use criterion::{Criterion, criterion_group, criterion_main}; -#[cfg(feature = "rust-mem")] -use cosmian_findex::InMemory; - #[cfg(feature = "sqlite-mem")] -use cosmian_findex::SqliteMemory; +use cosmian_findex_memories::SqliteMemory; #[cfg(feature = "sqlite-mem")] -const SQLITE_PATH: &str = "./target/benches.sqlite"; +const SQLITE_PATH: &str = "benches.sqlite.db"; +// Redis memory back-end requires a tokio runtime, and all operations to +// happen in the same runtime, otherwise the connection returns a broken +// pipe error. #[cfg(feature = "redis-mem")] -use cosmian_findex::RedisMemory; +use cosmian_findex_memories::RedisMemory; #[cfg(feature = "redis-mem")] fn get_redis_url() -> String { @@ -28,9 +31,8 @@ fn get_redis_url() -> String { ) } -/// Refer to `src/memory/postgresql_store/memory.rs` for local setup instructions #[cfg(feature = "postgres-mem")] -use cosmian_findex::{PostgresMemory, PostgresMemoryError}; +use cosmian_findex_memories::{PostgresMemory, PostgresMemoryError}; #[cfg(feature = "postgres-mem")] fn get_postgresql_url() -> String { @@ -54,14 +56,14 @@ async fn connect_and_init_table( pg_config.url = Some(db_url.to_string()); let test_pool = pg_config.builder(NoTls)?.build()?; - let m = PostgresMemory::, [u8; WORD_LENGTH]>::connect_with_pool( + let m = PostgresMemory::, [u8; WORD_LENGTH]>::new_with_pool( test_pool, table_name.to_string(), ) - .await?; + .await; + + m.initialize().await?; - m.initialize_table(db_url.to_string(), table_name, NoTls) - .await?; Ok(m) } // Number of points in each graph. @@ -70,20 +72,11 @@ const N_PTS: usize = 9; fn bench_search_multiple_bindings(c: &mut Criterion) { let mut rng = CsRng::from_entropy(); - #[cfg(feature = "rust-mem")] - bench_memory_search_multiple_bindings( - "in-memory", - N_PTS, - async || InMemory::default(), - c, - &mut rng, - ); - #[cfg(feature = "redis-mem")] bench_memory_search_multiple_bindings( "Redis", N_PTS, - async || RedisMemory::connect(&get_redis_url()).await.unwrap(), + async || RedisMemory::new_with_url(&get_redis_url()).await.unwrap(), c, &mut rng, ); @@ -92,7 +85,13 @@ fn bench_search_multiple_bindings(c: &mut Criterion) { bench_memory_search_multiple_bindings( "SQLite", N_PTS, - async || SqliteMemory::connect(SQLITE_PATH).await.unwrap(), + async || { + let m = SqliteMemory::new_with_path(SQLITE_PATH, "bench_memory_smd".to_string()) + .await + .unwrap(); + m.initialize().await.unwrap(); + m + }, c, &mut rng, ); @@ -102,12 +101,11 @@ fn bench_search_multiple_bindings(c: &mut Criterion) { "Postgres", N_PTS, async || { - connect_and_init_table( - get_postgresql_url(), - "bench_memory_search_multiple_bindings".to_string(), - ) - .await - .unwrap() + let m = connect_and_init_table(get_postgresql_url(), "bench_memory_smd".to_string()) + .await + .unwrap(); + m.initialize().await.unwrap(); + m }, c, &mut rng, @@ -117,20 +115,11 @@ fn bench_search_multiple_bindings(c: &mut Criterion) { fn bench_search_multiple_keywords(c: &mut Criterion) { let mut rng = CsRng::from_entropy(); - #[cfg(feature = "rust-mem")] - bench_memory_search_multiple_keywords( - "in-memory", - N_PTS, - async || InMemory::default(), - c, - &mut rng, - ); - #[cfg(feature = "redis-mem")] bench_memory_search_multiple_keywords( "Redis", N_PTS, - async || RedisMemory::connect(&get_redis_url()).await.unwrap(), + async || RedisMemory::new_with_url(&get_redis_url()).await.unwrap(), c, &mut rng, ); @@ -139,7 +128,13 @@ fn bench_search_multiple_keywords(c: &mut Criterion) { bench_memory_search_multiple_keywords( "SQLite", N_PTS, - async || SqliteMemory::connect(SQLITE_PATH).await.unwrap(), + async || { + let m = SqliteMemory::new_with_path(SQLITE_PATH, "bench_memory_smk".to_string()) + .await + .unwrap(); + m.initialize().await.unwrap(); + m + }, c, &mut rng, ); @@ -149,12 +144,9 @@ fn bench_search_multiple_keywords(c: &mut Criterion) { "Postgres", N_PTS, async || { - connect_and_init_table( - get_postgresql_url(), - "bench_memory_search_multiple_keywords".to_string(), - ) - .await - .unwrap() + connect_and_init_table(get_postgresql_url(), "bench_memory_smk".to_string()) + .await + .unwrap() }, c, &mut rng, @@ -164,24 +156,11 @@ fn bench_search_multiple_keywords(c: &mut Criterion) { fn bench_insert_multiple_bindings(c: &mut Criterion) { let mut rng = CsRng::from_entropy(); - #[cfg(feature = "rust-mem")] - bench_memory_insert_multiple_bindings( - "in-memory", - N_PTS, - async || InMemory::default(), - c, - async |m: &InMemory<_, _>| -> Result<(), String> { - m.clear(); - Ok(()) - }, - &mut rng, - ); - #[cfg(feature = "redis-mem")] bench_memory_insert_multiple_bindings( "Redis", N_PTS, - async || RedisMemory::connect(&get_redis_url()).await.unwrap(), + async || RedisMemory::new_with_url(&get_redis_url()).await.unwrap(), c, RedisMemory::clear, &mut rng, @@ -191,7 +170,13 @@ fn bench_insert_multiple_bindings(c: &mut Criterion) { bench_memory_insert_multiple_bindings( "SQLite", N_PTS, - async || SqliteMemory::connect(SQLITE_PATH).await.unwrap(), + async || { + let m = SqliteMemory::new_with_path(SQLITE_PATH, "bench_memory_imd".to_string()) + .await + .unwrap(); + m.initialize().await.unwrap(); + m + }, c, SqliteMemory::clear, &mut rng, @@ -202,12 +187,9 @@ fn bench_insert_multiple_bindings(c: &mut Criterion) { "Postgres", N_PTS, async || { - connect_and_init_table( - get_postgresql_url(), - "bench_memory_insert_multiple_bindings".to_string(), - ) - .await - .unwrap() + connect_and_init_table(get_postgresql_url(), "bench_memory_imd".to_string()) + .await + .unwrap() }, c, async |m: &PostgresMemory<_, _>| -> Result<(), String> { @@ -220,24 +202,11 @@ fn bench_insert_multiple_bindings(c: &mut Criterion) { fn bench_contention(c: &mut Criterion) { let mut rng = CsRng::from_entropy(); - #[cfg(feature = "rust-mem")] - bench_memory_contention( - "in-memory", - N_PTS, - async || InMemory::default(), - c, - async |m: &InMemory<_, _>| -> Result<(), String> { - m.clear(); - Ok(()) - }, - &mut rng, - ); - #[cfg(feature = "redis-mem")] bench_memory_contention( "Redis", N_PTS, - async || RedisMemory::connect(&get_redis_url()).await.unwrap(), + async || RedisMemory::new_with_url(&get_redis_url()).await.unwrap(), c, RedisMemory::clear, &mut rng, @@ -247,7 +216,13 @@ fn bench_contention(c: &mut Criterion) { bench_memory_contention( "SQLite", N_PTS, - async || SqliteMemory::connect(SQLITE_PATH).await.unwrap(), + async || { + let m = SqliteMemory::new_with_path(SQLITE_PATH, "bench_memory_contention".to_string()) + .await + .unwrap(); + m.initialize().await.unwrap(); + m + }, c, SqliteMemory::clear, &mut rng, @@ -272,8 +247,9 @@ fn bench_contention(c: &mut Criterion) { #[cfg(any(feature = "redis-mem", feature = "postgres-mem"))] mod delayed_memory { - use cosmian_findex::{ - Address, MemoryADT, PostgresMemory, PostgresMemoryError, RedisMemory, RedisMemoryError, + use cosmian_findex::{Address, MemoryADT}; + use cosmian_findex_memories::{ + PostgresMemory, PostgresMemoryError, RedisMemory, RedisMemoryError, }; use rand::Rng; use rand_distr::StandardNormal; @@ -369,7 +345,7 @@ fn bench_one_to_many(c: &mut Criterion) { N_PTS, async || { DelayedMemory::new( - RedisMemory::connect(&get_redis_url()).await.unwrap(), + RedisMemory::new_with_url(&get_redis_url()).await.unwrap(), *mean, *variance, ) diff --git a/crate/memories/docker-compose.yml b/crate/memories/docker-compose.yml new file mode 100644 index 00000000..4a58a2fc --- /dev/null +++ b/crate/memories/docker-compose.yml @@ -0,0 +1,25 @@ +--- +services: + redis: + image: redis:latest + ports: + - 6379:6379 + healthcheck: + test: [CMD, redis-cli, ping] + interval: 10s + timeout: 5s + retries: 5 + postgres: + image: postgres + ports: + - 5432:5432 + environment: + - POSTGRES_USER=cosmian + - POSTGRES_DB=cosmian + - POSTGRES_PASSWORD=cosmian + - PGDATA=/tmp/postgres3 + healthcheck: + test: [CMD, pg_isready, -U, cosmian] + interval: 10s + timeout: 5s + retries: 5 diff --git a/crate/memories/examples/example.rs b/crate/memories/examples/example.rs new file mode 100644 index 00000000..e3174bd6 --- /dev/null +++ b/crate/memories/examples/example.rs @@ -0,0 +1,97 @@ +//! This example show-cases the use of Findex to securely store a hash-map with different back ends. +#[path = "shared_utils.rs"] +mod shared_utils; + +use cosmian_crypto_core::{CsRng, Secret, reexport::rand_core::SeedableRng}; +use cosmian_findex_memories::{ + PostgresMemory, PostgresMemoryError, RedisMemory, SqliteMemory, + reexport::cosmian_findex::{ADDRESS_LENGTH, Address, Findex, IndexADT, MemoryEncryptionLayer}, +}; +use deadpool_postgres::{Config, Pool}; +use futures::executor::block_on; +use shared_utils::{WORD_LENGTH, decoder, encoder, gen_index}; +use std::collections::HashMap; +use tokio_postgres::NoTls; + +const DB_URL: &str = "postgres://cosmian:cosmian@localhost/cosmian"; +const DB_PATH: &str = "redis://localhost:6379"; +const DB_PATH2: &str = "./target/debug/sqlite-test.db"; +const TABLE_NAME: &str = "findex_memory"; + +async fn create_pool(db_url: &str) -> Result { + let mut pg_config = Config::new(); + pg_config.url = Some(db_url.to_string()); + let pool = pg_config.builder(NoTls)?.build()?; + Ok(pool) +} + +#[tokio::main] +async fn main() { + // For cryptographic applications, it is important to use a secure RNG. In + // Rust, those RNG implement the `CryptoRng` trait. + let mut rng = CsRng::from_entropy(); + + // Generate fresh Findex key. In practice only one user is in charge of + // generating the key (the administrator?): all users *must* share the same + // key in order to make the index inter-operable. + let key = Secret::random(&mut rng); + + // Generating the random index. + let index = gen_index(&mut rng); + + // This example uses our Redis-based implementation of `MemoryADT`. + let redis_memory = + RedisMemory::, [u8; WORD_LENGTH]>::new_with_url(DB_PATH) + .await + .unwrap(); + + // You can also use our Sqlite-based implementation of `MemoryADT`. + let _sqlite_memory = SqliteMemory::, [u8; WORD_LENGTH]>::new_with_path( + DB_PATH2, + TABLE_NAME.to_owned(), + ) + .await + .unwrap(); + + // Or else, the Postgres-based implementation of `MemoryADT`. Refer to README.md for details on how to setup + // the database to use this example. + let pool = create_pool(DB_URL).await.unwrap(); + let _postgres_memory = + PostgresMemory::, [u8; WORD_LENGTH]>::new_with_pool( + pool.clone(), + TABLE_NAME.to_string(), + ) + .await; + + // Adding an encryption layer to the chosen memory + let encrypted_memory = MemoryEncryptionLayer::new(&key, redis_memory); + + // Instantiating Findex requires passing the key, the memory used and the + // encoder and decoder. Quite simple, after all :) + let findex = Findex::< + WORD_LENGTH, // size of a word + u64, // type of a value + String, // type of an encoding error + _, // type of the memory + >::new(encrypted_memory, encoder, decoder); + + // Here we insert all bindings one by one, blocking on each call. A better + // way would be to performed all such calls in parallel using tasks. + index + .clone() + .into_iter() + .for_each(|(kw, vs)| block_on(findex.insert(kw, vs)).expect("insert failed")); + + // In order to verify insertion was correctly performed, we search for all + // the indexed keywords... + let res = index + .keys() + .cloned() + .map(|kw| (kw, block_on(findex.search(&kw)).expect("search failed"))) + .collect::>(); + + // ... and verify we get the whole index back! + assert_eq!(res, index); + + println!("All good !"); +} diff --git a/crate/memories/examples/shared_utils.rs b/crate/memories/examples/shared_utils.rs new file mode 100644 index 00000000..af828b8a --- /dev/null +++ b/crate/memories/examples/shared_utils.rs @@ -0,0 +1,85 @@ +use cosmian_crypto_core::reexport::rand_core::CryptoRngCore; +use cosmian_findex::Op; +use std::collections::{HashMap, HashSet}; + +/// This function generates a random set of (key, values) couples. Since Findex +/// API is those of an Index which only returns the *set* of values associated +/// to a given key, all random values generated here are stored in a `HashSet`. +/// +/// In Findex's jargon, we say that these sets of values are *bound* to their +/// associated *keyword* of type `[u8; 8]` in this example. Since Findex only +/// requires from the values to be hashable, we could have taken any type that +/// implements `Hash` instead of the `u64`. +pub fn gen_index(rng: &mut impl CryptoRngCore) -> HashMap<[u8; 8], HashSet> { + (0..6) + .map(|i| { + let kw = rng.next_u64().to_be_bytes(); + let vals = (0..10_i64.pow(i) as usize) + .map(|_| rng.next_u64()) + .collect::>(); + (kw, vals) + }) + .collect() +} + +/// The encoder will use 1 bit to encode the operation (insert or delete), and 7 +/// bits to encode the number of values (of type u64) in the word. This allows +/// for words of 2^7 = 128 values, which are serialized into an array of 8 bytes +/// each. The `WORD_LENGTH` is therefore 1 byte of metadata plus 128 * 8 bytes +/// of values. +pub const WORD_LENGTH: usize = 1 + 8 * 128; + +pub fn encoder(op: Op, values: HashSet) -> Result, String> { + let mut words = Vec::new(); // This could be initialized with the correct size. + let mut values = values.into_iter().peekable(); + while values.peek().is_some() { + let chunk = (0..128) + .filter_map(|_| values.next()) + .map(|v| v.to_be_bytes()) + .collect::>(); + + let metadata = + ::try_from(chunk.len() - 1).unwrap() + if let Op::Insert = op { 128 } else { 0 }; + let mut word = [0; WORD_LENGTH]; + word[0] = metadata; + chunk + .into_iter() + .enumerate() + .for_each(|(i, v)| word[1 + i * 8..1 + (i + 1) * 8].copy_from_slice(v.as_slice())); + words.push(word); + } + Ok(words) +} + +pub fn decoder(words: Vec<[u8; WORD_LENGTH]>) -> Result, String> { + let mut values = HashSet::new(); + words.into_iter().for_each(|w| { + let metadata = w[0]; + + // Extract the highest bit to recover the operation. + let op = if metadata < 128 { + Op::Delete + } else { + Op::Insert + }; + + // Remove the highest bit to recover the number of values. + let n = metadata & 127; + + for i in 0..=n as usize { + let v = u64::from_be_bytes(w[1 + i * 8..1 + (i + 1) * 8].try_into().unwrap()); + if let Op::Insert = op { + values.insert(v); + } else { + values.remove(&v); + } + } + }); + + Ok(values) +} + +#[allow(dead_code)] +fn main() { + panic!("This is a utility module and should not be run directly."); +} diff --git a/crate/memories/src/lib.rs b/crate/memories/src/lib.rs new file mode 100644 index 00000000..25f593c8 --- /dev/null +++ b/crate/memories/src/lib.rs @@ -0,0 +1,26 @@ +#[cfg(feature = "redis-mem")] +mod redis_mem; +#[cfg(feature = "redis-mem")] +pub use redis_mem::{RedisMemory, RedisMemoryError}; + +#[cfg(feature = "sqlite-mem")] +mod sqlite_mem; +#[cfg(feature = "sqlite-mem")] +pub use sqlite_mem::{SqliteMemory, SqliteMemoryError}; + +#[cfg(feature = "postgres-mem")] +mod postgresql_mem; +#[cfg(feature = "postgres-mem")] +pub use postgresql_mem::{PostgresMemory, PostgresMemoryError}; + +pub mod reexport { + #[cfg(feature = "sqlite-mem")] + pub use async_sqlite; + pub use cosmian_findex; + #[cfg(feature = "postgres-mem")] + pub use deadpool_postgres; + #[cfg(feature = "redis-mem")] + pub use redis; + #[cfg(feature = "postgres-mem")] + pub use tokio_postgres; +} diff --git a/src/memory/postgresql_store/error.rs b/crate/memories/src/postgresql_mem/error.rs similarity index 100% rename from src/memory/postgresql_store/error.rs rename to crate/memories/src/postgresql_mem/error.rs diff --git a/src/memory/postgresql_store/memory.rs b/crate/memories/src/postgresql_mem/memory.rs similarity index 61% rename from src/memory/postgresql_store/memory.rs rename to crate/memories/src/postgresql_mem/memory.rs index 1ed592ef..9d60d3f0 100644 --- a/src/memory/postgresql_store/memory.rs +++ b/crate/memories/src/postgresql_mem/memory.rs @@ -1,11 +1,7 @@ use super::PostgresMemoryError; -use crate::{Address, MemoryADT}; +use cosmian_findex::{Address, MemoryADT}; use deadpool_postgres::Pool; use std::marker::PhantomData; -use tokio_postgres::{ - Socket, - tls::{MakeTlsConnect, TlsConnect}, -}; #[derive(Clone, Debug)] pub struct PostgresMemory { @@ -17,65 +13,38 @@ pub struct PostgresMemory { impl PostgresMemory, [u8; WORD_LENGTH]> { - /// Connect to a Postgres database and create a table if it doesn't exist - pub async fn initialize_table( - &self, - db_url: String, - table_name: String, - tls: T, - ) -> Result<(), PostgresMemoryError> - where - T: MakeTlsConnect + Send, - T::Stream: Send + 'static, - T::TlsConnect: Send, - >::Future: Send, - { - let (client, connection) = tokio_postgres::connect(&db_url, tls).await?; - - // The connection object performs the actual communication with the database - // `Connection` only resolves when the connection is closed, either because a fatal error has - // occurred, or because its associated `Client` has dropped and all outstanding work has completed. - let conn_handle = tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); + /// Returns a new memory instance from the given connection pool to a + /// PostgreSQL database. + pub async fn new_with_pool(pool: Pool, table_name: String) -> Self { + Self { + pool, + table_name, + _marker: PhantomData, + } + } - let returned = client + /// Connects to a PostgreSQL database and creates a table if it doesn't + /// exist. + pub async fn initialize(&self) -> Result<(), PostgresMemoryError> { + self.pool + .get() + .await? .execute( &format!( - " - CREATE TABLE IF NOT EXISTS {} ( + "CREATE TABLE IF NOT EXISTS {} ( a BYTEA PRIMARY KEY CHECK (octet_length(a) = {}), w BYTEA NOT NULL CHECK (octet_length(w) = {}) );", - table_name, ADDRESS_LENGTH, WORD_LENGTH + self.table_name, ADDRESS_LENGTH, WORD_LENGTH ), &[], ) .await?; - if returned != 0 { - return Err(PostgresMemoryError::TableCreationError(returned)); - } - drop(client); - let _ = conn_handle.await; // ensures that the connection is closed Ok(()) } - /// Connect to a Postgres database and create a table if it doesn't exist - pub async fn connect_with_pool( - pool: Pool, - table_name: String, - ) -> Result { - Ok(Self { - pool, - table_name, - _marker: PhantomData, - }) - } - - /// Deletes all rows from the findex memory table + /// Clears all bindings from this memory. #[cfg(feature = "test-utils")] pub async fn clear(&self) -> Result<(), PostgresMemoryError> { self.pool @@ -83,8 +52,38 @@ impl .await? .execute(&format!("TRUNCATE TABLE {};", self.table_name), &[]) .await?; + Ok(()) } + + fn gwrite_script(&self) -> String { + format!( + " + WITH + guard_check AS ( + SELECT w FROM {0} WHERE a = $1::bytea + ), + dedup_input_table AS ( + SELECT DISTINCT ON (a) a, w + FROM UNNEST($3::bytea[], $4::bytea[]) WITH ORDINALITY AS t(a, w, order_idx) + ORDER BY a, order_idx DESC + ), + insert_cte AS ( + INSERT INTO {0} (a, w) + SELECT a, w FROM dedup_input_table AS t(a,w) + WHERE ( + $2::bytea IS NULL AND NOT EXISTS (SELECT 1 FROM guard_check) + ) OR ( + $2::bytea IS NOT NULL AND EXISTS ( + SELECT 1 FROM guard_check WHERE w = $2::bytea + ) + ) + ON CONFLICT (a) DO UPDATE SET w = EXCLUDED.w + ) + SELECT COALESCE((SELECT w FROM guard_check)) AS original_guard_value;", + self.table_name + ) + } } impl MemoryADT @@ -99,23 +98,22 @@ impl MemoryADT addresses: Vec, ) -> Result>, Self::Error> { let client = self.pool.get().await?; - // statements are cached per connection and not per pool + + // Statements are cached per connection and not per pool. let stmt = client - .prepare_cached( - format!( - "SELECT f.w + .prepare_cached(&format!( + // The left join is necessary to ensure that the order of the + // addresses is preserved as well as to return None for addresses + // that don't exist. + "SELECT f.w FROM UNNEST($1::bytea[]) WITH ORDINALITY AS params(addr, idx) LEFT JOIN {} f ON params.addr = f.a ORDER BY params.idx;", - self.table_name - ) - .as_str(), - ) + self.table_name + )) .await?; client - // the left join is necessary to ensure that the order of the addresses is preserved - // as well as to return None for addresses that don't exist .query( &stmt, &[&addresses @@ -126,13 +124,10 @@ impl MemoryADT .await? .iter() .map(|row| { - let bytes_slice: Option<&[u8]> = row.try_get("w")?; // `row.get(0)` can panic - bytes_slice.map_or(Ok(None), |slice| { - slice - .try_into() - .map(Some) - .map_err(|_| PostgresMemoryError::InvalidDataLength(slice.len())) - }) + row.try_get::<_, Option<&[u8]>>("w")? + .map(Self::Word::try_from) + .transpose() + .map_err(PostgresMemoryError::TryFromSliceError) }) .collect() } @@ -142,45 +137,19 @@ impl MemoryADT guard: (Self::Address, Option), bindings: Vec<(Self::Address, Self::Word)>, ) -> Result, Self::Error> { - let g_write_script = format!( - " - WITH - guard_check AS ( - SELECT w FROM {0} WHERE a = $1::bytea - ), - dedup_input_table AS ( - SELECT DISTINCT ON (a) a, w - FROM UNNEST($3::bytea[], $4::bytea[]) WITH ORDINALITY AS t(a, w, order_idx) - ORDER BY a, order_idx DESC - ), - insert_cte AS ( - INSERT INTO {0} (a, w) - SELECT a, w FROM dedup_input_table AS t(a,w) - WHERE ( - $2::bytea IS NULL AND NOT EXISTS (SELECT 1 FROM guard_check) - ) OR ( - $2::bytea IS NOT NULL AND EXISTS ( - SELECT 1 FROM guard_check WHERE w = $2::bytea - ) - ) - ON CONFLICT (a) DO UPDATE SET w = EXCLUDED.w - ) - SELECT COALESCE((SELECT w FROM guard_check)) AS original_guard_value;", - self.table_name - ); - let (addresses, words): (Vec<[u8; ADDRESS_LENGTH]>, Vec) = bindings.into_iter().map(|(a, w)| (*a, w)).unzip(); - const MAX_RETRIES: usize = 10; - for _ in 0..MAX_RETRIES { - // while counterintuitive, getting a new client on each retry is a better approach - // than trying to reuse the same client since it allows other operations to use the - // connection between retries. + // Since a guarded write operation is lock-free, this loop is guaranteed + // to terminate. + loop { + // Do not lock a resource for a potentially long loop, instead + // request a new one at each iteration. let mut client = self.pool.get().await?; - let stmt = client.prepare_cached(g_write_script.as_str()).await?; - let result = async { + let stmt = client.prepare_cached(&self.gwrite_script()).await?; + + let res = async { let tx = client .build_transaction() .isolation_level( @@ -200,21 +169,26 @@ impl MemoryADT ], ) .await? - .map_or( - Ok::, PostgresMemoryError>(None), - |row| { - row.try_get::<_, Option<&[u8]>>(0)? - .map_or(Ok(None), |r| Ok(Some(r.try_into()?))) - }, - )?; + .map(|row| { + row.try_get::<_, Option<&[u8]>>(0)? + .map(Self::Word::try_from) + .transpose() + .map_err(PostgresMemoryError::TryFromSliceError) + }) + .transpose()? + .flatten(); + tx.commit().await?; + Ok(res) } .await; - match result { + + match res { Ok(value) => return Ok(value), Err(err) => { - // Retry on serialization failures (error code 40001), otherwise fail and return the error + // Retry on serialization failures (error code 40001), + // otherwise fail and return the error if let PostgresMemoryError::TokioPostgresError(pg_err) = &err { if pg_err.code().is_some_and(|code| code.code() == "40001") { continue; @@ -224,30 +198,18 @@ impl MemoryADT } } } - Err(PostgresMemoryError::RetryExhaustedError(MAX_RETRIES)) } } + #[cfg(test)] mod tests { - //! To run the postgresql benchmarks locally, add the following service to your pg_service.conf file - //! (usually under ~/.pg_service.conf): - //! - //! [cosmian_service] - //! host=localhost - //! dbname=cosmian - //! user=cosmian - //! password=cosmian - use deadpool_postgres::Config; - use tokio_postgres::NoTls; - use super::*; - use crate::{ - ADDRESS_LENGTH, Address, WORD_LENGTH, - test_utils::{ - gen_seed, test_guarded_write_concurrent, test_rw_same_address, - test_single_write_and_read, test_wrong_guard, - }, + use cosmian_findex::{ + ADDRESS_LENGTH, WORD_LENGTH, gen_seed, test_guarded_write_concurrent, test_rw_same_address, + test_single_write_and_read, test_wrong_guard, }; + use deadpool_postgres::Config; + use tokio_postgres::NoTls; const DB_URL: &str = "postgres://cosmian:cosmian@localhost/cosmian"; @@ -259,7 +221,8 @@ mod tests { Ok(pool) } - // Setup function that handles pool creation, memory initialization, test execution, and cleanup + // Setup function that handles pool creation, memory initialization, test + // execution, and cleanup async fn setup_and_run_test( table_name: &str, test_fn: F, @@ -269,14 +232,9 @@ mod tests { Fut: std::future::Future + Send, { let test_pool = create_testing_pool(DB_URL).await.unwrap(); - let m = PostgresMemory::, [u8; WORD_LENGTH]>::connect_with_pool( - test_pool.clone(), - table_name.to_string(), - ) - .await?; + let m = PostgresMemory::new_with_pool(test_pool.clone(), table_name.to_string()).await; - m.initialize_table(DB_URL.to_string(), table_name.to_string(), NoTls) - .await?; + m.initialize().await?; test_fn(m).await; @@ -294,14 +252,13 @@ mod tests { async fn test_initialization() -> Result<(), PostgresMemoryError> { let table_name: &str = "test_initialization"; let test_pool = create_testing_pool(DB_URL).await.unwrap(); - let m = PostgresMemory::, [u8; WORD_LENGTH]>::connect_with_pool( + let m = PostgresMemory::, [u8; WORD_LENGTH]>::new_with_pool( test_pool.clone(), table_name.to_string(), ) - .await?; + .await; - m.initialize_table(DB_URL.to_string(), table_name.to_string(), NoTls) - .await?; + m.initialize().await?; // check that the table actually exists let client = test_pool.get().await?; @@ -330,7 +287,7 @@ mod tests { #[tokio::test] async fn test_rw_seq() -> Result<(), PostgresMemoryError> { setup_and_run_test("findex_test_rw_seq", |m| async move { - test_single_write_and_read::(&m, gen_seed()).await; + test_single_write_and_read(&m, gen_seed()).await; }) .await } @@ -338,7 +295,7 @@ mod tests { #[tokio::test] async fn test_guard_seq() -> Result<(), PostgresMemoryError> { setup_and_run_test("findex_test_guard_seq", |m| async move { - test_wrong_guard::(&m, gen_seed()).await; + test_wrong_guard(&m, gen_seed()).await; }) .await } @@ -346,7 +303,7 @@ mod tests { #[tokio::test] async fn test_rw_same_address_seq() -> Result<(), PostgresMemoryError> { setup_and_run_test("findex_test_rw_same_address_seq", |m| async move { - test_rw_same_address::(&m, gen_seed()).await; + test_rw_same_address(&m, gen_seed()).await; }) .await } @@ -354,7 +311,7 @@ mod tests { #[tokio::test] async fn test_rw_ccr() -> Result<(), PostgresMemoryError> { setup_and_run_test("findex_test_rw_ccr", |m| async move { - test_guarded_write_concurrent::(&m, gen_seed(), Some(100)).await; + test_guarded_write_concurrent(&m, gen_seed(), Some(100)).await; }) .await } diff --git a/src/memory/postgresql_store/mod.rs b/crate/memories/src/postgresql_mem/mod.rs similarity index 100% rename from src/memory/postgresql_store/mod.rs rename to crate/memories/src/postgresql_mem/mod.rs diff --git a/src/memory/redis_store.rs b/crate/memories/src/redis_mem.rs similarity index 82% rename from src/memory/redis_store.rs rename to crate/memories/src/redis_mem.rs index 5d4801be..8825459b 100644 --- a/src/memory/redis_store.rs +++ b/crate/memories/src/redis_mem.rs @@ -1,4 +1,4 @@ -use crate::{Address, MemoryADT}; +use cosmian_findex::{Address, MemoryADT}; use redis::aio::ConnectionManager; use std::{fmt, marker::PhantomData}; @@ -60,8 +60,8 @@ impl fmt::Debug for RedisMemory { } impl RedisMemory { - /// Connects to a Redis server with a `ConnectionManager`. - pub async fn connect_with_manager( + /// Returns a new instance using this connection manager. + pub async fn new_with_manager( mut manager: ConnectionManager, ) -> Result { let script_hash = redis::cmd("SCRIPT") @@ -77,13 +77,14 @@ impl RedisMemory { }) } - /// Connects to a Redis server using the given URL. - pub async fn connect(url: &str) -> Result { + /// Returns a new instance using the Redis instance at the given URL. + pub async fn new_with_url(url: &str) -> Result { let client = redis::Client::open(url)?; let manager = client.get_connection_manager().await?; - Self::connect_with_manager(manager).await + Self::new_with_manager(manager).await } + /// Clears all bindings from this memory. #[cfg(feature = "test-utils")] pub async fn clear(&self) -> Result<(), RedisMemoryError> { redis::cmd("FLUSHDB") @@ -106,7 +107,6 @@ impl MemoryADT ) -> Result>, Self::Error> { let mut cmd = redis::cmd("MGET"); let cmd = addresses.iter().fold(&mut cmd, |c, a| c.arg(&**a)); - // Cloning the connection manager is cheap since it is an `Arc`. cmd.query_async(&mut self.manager.clone()) .await .map_err(RedisMemoryError::RedisError) @@ -118,6 +118,7 @@ impl MemoryADT bindings: Vec<(Self::Address, Self::Word)>, ) -> Result, Self::Error> { let (guard_address, guard_value) = guard; + let mut cmd = redis::cmd("EVALSHA"); let cmd = cmd .arg(self.script_hash.as_str()) @@ -129,12 +130,9 @@ impl MemoryADT .map(|bytes| bytes.as_slice()) .unwrap_or(b"false".as_slice()), ); - let cmd = bindings .iter() .fold(cmd.arg(bindings.len()), |cmd, (a, w)| cmd.arg(&**a).arg(w)); - - // Cloning the connection manager is cheap since it is an `Arc`. cmd.query_async(&mut self.manager.clone()) .await .map_err(RedisMemoryError::RedisError) @@ -145,13 +143,9 @@ impl MemoryADT mod tests { use super::*; - use crate::{ - WORD_LENGTH, - test_utils::gen_seed, - { - test_guarded_write_concurrent, test_rw_same_address, test_single_write_and_read, - test_wrong_guard, - }, + use cosmian_findex::{ + WORD_LENGTH, gen_seed, test_guarded_write_concurrent, test_rw_same_address, + test_single_write_and_read, test_wrong_guard, }; fn get_redis_url() -> String { @@ -163,25 +157,25 @@ mod tests { #[tokio::test] async fn test_rw_seq() { - let m = RedisMemory::connect(&get_redis_url()).await.unwrap(); + let m = RedisMemory::new_with_url(&get_redis_url()).await.unwrap(); test_single_write_and_read::(&m, gen_seed()).await } #[tokio::test] async fn test_guard_seq() { - let m = RedisMemory::connect(&get_redis_url()).await.unwrap(); + let m = RedisMemory::new_with_url(&get_redis_url()).await.unwrap(); test_wrong_guard::(&m, gen_seed()).await } #[tokio::test] async fn test_collision_seq() { - let m = RedisMemory::connect(&get_redis_url()).await.unwrap(); + let m = RedisMemory::new_with_url(&get_redis_url()).await.unwrap(); test_rw_same_address::(&m, gen_seed()).await } #[tokio::test] async fn test_rw_ccr() { - let m = RedisMemory::connect(&get_redis_url()).await.unwrap(); + let m = RedisMemory::new_with_url(&get_redis_url()).await.unwrap(); test_guarded_write_concurrent::(&m, gen_seed(), None).await } } diff --git a/src/memory/sqlite_store.rs b/crate/memories/src/sqlite_mem.rs similarity index 61% rename from src/memory/sqlite_store.rs rename to crate/memories/src/sqlite_mem.rs index 36d4f12e..2057d6dd 100644 --- a/src/memory/sqlite_store.rs +++ b/crate/memories/src/sqlite_mem.rs @@ -1,8 +1,8 @@ -use crate::{Address, MemoryADT}; use async_sqlite::{ Pool, PoolBuilder, rusqlite::{OptionalExtension, params_from_iter}, }; +use cosmian_findex::{Address, MemoryADT}; use std::{ collections::HashMap, fmt::{self, Debug}, @@ -34,6 +34,7 @@ impl From for SqliteMemoryError { #[derive(Clone)] pub struct SqliteMemory { pool: Pool, + table_name: String, _marker: PhantomData<(Address, Word)>, } @@ -46,44 +47,64 @@ impl Debug for SqliteMemory { } } -// The following settings are used to improve performance: -// - journal_mode = WAL : WAL journaling is faster than the default DELETE mode. -// - synchronous = NORMAL: Reduces disk I/O by only calling fsync() at critical moments rather -// than after every transaction (FULL mode); this does not compromise data integrity. -const CREATE_TABLE_SCRIPT: &str = " -PRAGMA synchronous = NORMAL; -PRAGMA journal_mode = WAL; -CREATE TABLE IF NOT EXISTS memory ( - a BLOB PRIMARY KEY, - w BLOB NOT NULL -);"; - impl SqliteMemory { - /// Connects to a known DB using the given path. - /// - /// # Arguments - /// - /// * `path` - The path to the sqlite3 database file. - pub async fn connect(path: &str) -> Result { - // This pool connections number defaults to the number of logical CPUs - // of the current system. + /// Returns a new memory instance using a pool of connections to the SQLite + /// database at the given path. + pub async fn new_with_path( + path: impl AsRef, + table_name: String, + ) -> Result { + // The number of connections in this pools defaults to the number of + // logical CPUs of the system. let pool = PoolBuilder::new().path(path).open().await?; - pool.conn(move |conn| conn.execute_batch(CREATE_TABLE_SCRIPT)) - .await?; - Ok(Self { pool, + table_name, _marker: PhantomData, }) } -} -impl SqliteMemory { + /// Returns a new memory instance using a pool of connections to an SQLite + /// database. + pub async fn new_with_pool(pool: Pool, table_name: String) -> Self { + Self { + pool, + table_name, + _marker: PhantomData, + } + } + + /// Creates the correct table in the associated database if it does not exist. + pub async fn initialize(&self) -> Result<(), SqliteMemoryError> { + // The following settings are used to improve performance: + // - journal_mode = WAL : WAL journaling is faster than the default + // DELETE mode. + // - synchronous = NORMAL: Reduces disk I/O by only calling fsync() at + // critical moments rather than after every transaction (FULL mode); + // this does not compromise data integrity. + let initialization_script = format!( + "PRAGMA synchronous = NORMAL; + PRAGMA journal_mode = WAL; + CREATE TABLE IF NOT EXISTS {} ( + a BLOB PRIMARY KEY, + w BLOB NOT NULL + );", + self.table_name + ); + + self.pool + .conn(move |conn| conn.execute_batch(&initialization_script)) + .await?; + Ok(()) + } + + /// Clears all bindings from this memory. #[cfg(feature = "test-utils")] pub async fn clear(&self) -> Result<(), SqliteMemoryError> { + let script = format!("DELETE FROM {}", self.table_name); self.pool - .conn(|cnx| cnx.execute("DELETE FROM memory", [])) + .conn(move |conn| conn.execute(&script, [])) .await?; Ok(()) } @@ -100,11 +121,13 @@ impl MemoryADT &self, addresses: Vec, ) -> Result>, Self::Error> { + let findex_table_name = self.table_name.clone(); self.pool .conn(move |conn| { let results = conn .prepare(&format!( - "SELECT a, w FROM memory WHERE a IN ({})", + "SELECT a, w FROM {} WHERE a IN ({})", + findex_table_name, vec!["?"; addresses.len()].join(",") ))? .query_map( @@ -122,7 +145,7 @@ impl MemoryADT // generate a returned value complying to the batch-read spec. Ok(addresses .iter() - // Copying is necessary here since the same word could be + // Copying is necessary since the same word could be // returned multiple times. .map(|addr| results.get(addr).copied()) .collect()) @@ -136,6 +159,7 @@ impl MemoryADT guard: (Self::Address, Option), bindings: Vec<(Self::Address, Self::Word)>, ) -> Result, Self::Error> { + let findex_table_name = self.table_name.clone(); let (ag, wg) = guard; self.pool @@ -145,13 +169,18 @@ impl MemoryADT )?; let current_word = tx - .query_row("SELECT w FROM memory WHERE a = ?", [&*ag], |row| row.get(0)) + .query_row( + &format!("SELECT w FROM {} WHERE a = ?", findex_table_name), + [&*ag], + |row| row.get(0), + ) .optional()?; if current_word == wg { tx.execute( &format!( - "INSERT OR REPLACE INTO memory (a, w) VALUES {}", + "INSERT OR REPLACE INTO {} (a, w) VALUES {}", + findex_table_name, vec!["(?,?)"; bindings.len()].join(",") ), params_from_iter( @@ -175,42 +204,47 @@ impl MemoryADT mod tests { use super::*; - use crate::{ + use cosmian_findex::{ WORD_LENGTH, gen_seed, test_guarded_write_concurrent, test_rw_same_address, test_single_write_and_read, test_wrong_guard, }; - const DB_PATH: &str = "./target/debug/sqlite-test.db"; + const DB_PATH: &str = "../../target/debug/sqlite-test.sqlite.db"; + const TABLE_NAME: &str = "findex_memory"; #[tokio::test] async fn test_rw_seq() { - let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::connect(DB_PATH) + let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::new_with_path(DB_PATH, TABLE_NAME.to_owned()) .await .unwrap(); + m.initialize().await.unwrap(); test_single_write_and_read(&m, gen_seed()).await } #[tokio::test] async fn test_guard_seq() { - let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::connect(DB_PATH) + let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::new_with_path(DB_PATH, TABLE_NAME.to_owned()) .await .unwrap(); + m.initialize().await.unwrap(); test_wrong_guard(&m, gen_seed()).await } #[tokio::test] async fn test_collision_seq() { - let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::connect(DB_PATH) + let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::new_with_path(DB_PATH, TABLE_NAME.to_owned()) .await .unwrap(); + m.initialize().await.unwrap(); test_rw_same_address(&m, gen_seed()).await } #[tokio::test] async fn test_rw_ccr() { - let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::connect(DB_PATH) + let m = SqliteMemory::<_, [u8; WORD_LENGTH]>::new_with_path(DB_PATH, TABLE_NAME.to_owned()) .await .unwrap(); + m.initialize().await.unwrap(); test_guarded_write_concurrent(&m, gen_seed(), Some(100)).await } } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index c8181991..45a64fa6 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.85.0" +channel = "1.87.0" components = ["rustfmt"] diff --git a/src/error.rs b/src/error.rs deleted file mode 100644 index 8954f877..00000000 --- a/src/error.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::fmt::{Debug, Display}; - -#[derive(Debug)] -pub enum Error
{ - Parsing(String), - Memory(String), - Conversion(String), - MissingValue(Address, usize), - CorruptedMemoryCache, -} - -impl Display for Error
{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -impl std::error::Error for Error
{} diff --git a/src/memory/mod.rs b/src/memory/mod.rs deleted file mode 100644 index 4846f78e..00000000 --- a/src/memory/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -mod encryption_layer; -pub use encryption_layer::{KEY_LENGTH, MemoryEncryptionLayer}; - -#[cfg(any(test, feature = "test-utils"))] -mod in_memory_store; -#[cfg(any(test, feature = "test-utils"))] -pub use in_memory_store::InMemory; - -#[cfg(feature = "redis-mem")] -mod redis_store; -#[cfg(feature = "redis-mem")] -pub use redis_store::{RedisMemory, RedisMemoryError}; - -#[cfg(feature = "sqlite-mem")] -mod sqlite_store; -#[cfg(feature = "sqlite-mem")] -pub use sqlite_store::{SqliteMemory, SqliteMemoryError}; - -#[cfg(feature = "postgres-mem")] -mod postgresql_store; -#[cfg(feature = "postgres-mem")] -pub use postgresql_store::{PostgresMemory, PostgresMemoryError};