diff --git a/Cargo.lock b/Cargo.lock index 382e0ddd..1db42a36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -298,6 +298,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foldhash" version = "0.2.0" @@ -459,6 +465,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash 0.1.5", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -482,7 +497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", ] [[package]] @@ -561,7 +576,7 @@ checksum = "1603ded11e2d4b590d75324f9f2f6b6f73f01c225b1826bbb3a8fecc685472d7" dependencies = [ "cpulist", "derive_more", - "foldhash", + "foldhash 0.2.0", "folo_ffi", "heapless", "itertools 0.14.0", @@ -635,7 +650,7 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1fc32f79859abf6783cce0e3d443db672e02ae28de8985ac592c1751183501d" dependencies = [ - "foldhash", + "foldhash 0.2.0", "new_zealand", "num-traits", ] @@ -1114,6 +1129,17 @@ dependencies = [ "thread_aware_macros", ] +[[package]] +name = "thread_aware_cache" +version = "0.1.0" +dependencies = [ + "criterion", + "hashbrown 0.15.5", + "parking_lot", + "rand", + "thread_aware", +] + [[package]] name = "thread_aware_macros" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 4e113ec0..4ce53afc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ ohno = { path = "crates/ohno", default-features = false, version = "0.2.0" } ohno_macros = { path = "crates/ohno_macros", default-features = false, version = "0.2.0" } testing_aids = { path = "crates/testing_aids", default-features = false, version = "0.0.0" } thread_aware = { path = "crates/thread_aware", default-features = false, version = "0.4.0" } +thread_aware_cache = { path = "crates/thread_aware_cache", default-features = false, version = "0.1.0" } thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.4.0" } thread_aware_macros_impl = { path = "crates/thread_aware_macros_impl", default-features = false, version = "0.4.0" } @@ -46,6 +47,7 @@ criterion = { version = "0.7.0", default-features = false } derive_more = { version = "2.0.1", default-features = false } duct = { version = "1.1.1", default-features = false } futures = { version = "0.3.31", default-features = false } +hashbrown = { version = "0.15.2", default-features = false } infinity_pool = { version = "0.8.1", default-features = false } insta = { version = "1.44.1", default-features = false } many_cpus = { version = "1.1.0", default-features = false } @@ -54,6 +56,7 @@ mutants = { version = "0.0.3", default-features = false } new_zealand = { version = "1.0.1", default-features = false } nm = { version = "0.1.21", default-features = false } once_cell = { version = "1.21.3", default-features = false } +parking_lot = { version = "0.12.3", default-features = false } pretty_assertions = { version = "1.4.1", default-features = false } prettyplease = { version = "0.2.37", default-features = false } proc-macro2 = { version = "1.0.103", default-features = false } diff --git a/crates/thread_aware_cache/CHANGELOG.md b/crates/thread_aware_cache/CHANGELOG.md new file mode 100644 index 00000000..97c243ce --- /dev/null +++ b/crates/thread_aware_cache/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.1.0] - 2024-12-09 + +### Added + +- Initial release of `thread_aware_cache`. diff --git a/crates/thread_aware_cache/Cargo.toml b/crates/thread_aware_cache/Cargo.toml new file mode 100644 index 00000000..2245c743 --- /dev/null +++ b/crates/thread_aware_cache/Cargo.toml @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "thread_aware_cache" +description = "A high-performance, NUMA-aware in-memory cache with SIEVE eviction." +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "cache", "numa", "sieve", "concurrent"] +categories = ["caching", "concurrency", "data-structures"] + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[features] +default = [] +test-util = [] +threads = ["thread_aware/threads"] + +[dependencies] +hashbrown = { workspace = true, features = ["raw-entry", "default-hasher"] } +parking_lot.workspace = true +thread_aware = { workspace = true, features = ["threads"] } + +[dev-dependencies] +criterion.workspace = true +rand = { workspace = true, features = ["std", "std_rng"] } + +[lints] +workspace = true + +[[bench]] +name = "cache" +harness = false diff --git a/crates/thread_aware_cache/README.md b/crates/thread_aware_cache/README.md new file mode 100644 index 00000000..c4d785e7 --- /dev/null +++ b/crates/thread_aware_cache/README.md @@ -0,0 +1,227 @@ +
+ +# Thread-Aware Cache + +[![crate.io](https://img.shields.io/crates/v/thread_aware_cache.svg)](https://crates.io/crates/thread_aware_cache) +[![docs.rs](https://docs.rs/thread_aware_cache/badge.svg)](https://docs.rs/thread_aware_cache) +[![MSRV](https://img.shields.io/crates/msrv/thread_aware_cache)](https://crates.io/crates/thread_aware_cache) +[![CI](https://github.com/microsoft/oxidizer/workflows/main/badge.svg)](https://github.com/microsoft/oxidizer/actions) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../LICENSE) + +
+ +* [Summary](#summary) +* [Architecture](#architecture) +* [Performance Characteristics](#performance-characteristics) +* [Pros and Cons](#pros-and-cons) +* [Example](#example) +* [Cross-Shard Behavior](#cross-shard-behavior) +* [Bloom Filter Optimization](#bloom-filter-optimization) +* [SIEVE Algorithm](#sieve-algorithm) +* [References](#references) + +## Summary + + + +A high-performance, NUMA-aware in-memory cache with SIEVE eviction. + +This crate provides [`NumaCache`], a cache designed for high read-throughput and low-latency +on multi-socket architectures. It combines several techniques: + +1. **Topology-Aware Sharding:** Data is partitioned by physical NUMA nodes to minimize + QPI/UPI interconnect traffic. +2. **Swiss Table Storage:** Utilizes [`hashbrown`] for SIMD-accelerated lookups. +3. **SIEVE Eviction Policy:** A scan-resistant, efficient eviction algorithm that outperforms + LRU in concurrent environments by minimizing metadata writes on reads. +4. **Thread-Aware Integration:** Built on top of [`thread_aware`] for true NUMA locality + via [`PinnedAffinity`]-based shard routing. +5. **Read-Through Replication:** Automatic cross-shard cloning promotes hot data to the + local shard, improving locality for subsequent accesses. +6. **Bloom Filter Optimization:** A lock-free Bloom filter provides fast negative lookups, + avoiding expensive cross-shard scans when a key definitely doesn't exist. +7. **NUMA-Aware Memory Allocation:** When a `ThreadRegistry` is provided, each shard is + allocated while pinned to its corresponding NUMA node, leveraging the OS's first-touch + memory policy for optimal memory locality. + +## Architecture + +The cache operates on a **Shared-Nothing-per-Node** model with **automatic locality promotion**. +Rather than sharding by key hash (which causes random memory access across sockets), +we shard by **Thread Affinity**. + +When a key is requested from a shard where it doesn't exist, but exists in another shard, +the value is automatically cloned to the local shard. This "read-through with local caching" +approach ensures: + +- **Fast path:** Local lookups are O(1) with zero interconnect traffic +- **Automatic locality:** Hot data migrates to where it's being accessed +- **Independent eviction:** Each shard manages its own capacity via SIEVE + +Each shard is cache-line aligned (64 bytes) to prevent false sharing between locks. + +### NUMA-Aware Memory Allocation + +For optimal performance on multi-socket systems, you can provide a `ThreadRegistry` to +ensure shard memory is allocated on the correct NUMA node: + +```rust,ignore +use std::sync::Arc; +use thread_aware_cache::{NumaCache, ThreadRegistry, ProcessorCount}; + +// Create a registry with all available processors +let registry = Arc::new(ThreadRegistry::new(&ProcessorCount::All)); +let affinities: Vec<_> = registry.affinities().collect(); + +// Build cache with NUMA-aware allocation +let cache = NumaCache::::builder() + .affinities(&affinities) + .registry(Arc::clone(®istry)) + .capacity_per_shard(10000) + .build(); +``` + +When a registry is provided, the builder pins the current thread to each affinity before +allocating that shard's memory. This leverages the operating system's **first-touch memory +policy**, ensuring each shard's data structures are physically allocated on the correct +NUMA node. + +## Performance Characteristics + +| Metric | Complexity | Notes | +| :--- | :--- | :--- | +| **Lookup (Local Hit)** | O(1) | Zero-interconnect traffic. | +| **Lookup (Remote Hit)** | O(n) shards | Clones to local shard for future O(1) access. | +| **Insertion** | Amortized O(1) | Includes potential eviction scan. | +| **Eviction** | Amortized O(1) | SIEVE hand movement is minimal in practice. | +| **Removal** | O(n) shards | Removes from all shards (due to replication). | +| **Concurrency** | Sharded `RwLock` | No false sharing due to explicit padding. | + +## Pros and Cons + +### Pros + +* **Excellent NUMA Locality:** Minimizes cross-socket traffic (QPI/UPI) by keeping data local to the thread that accesses it. +* **High Read Throughput:** Local hits are fast and contention-free across sockets. +* **Automatic Hot Data Migration:** Frequently accessed data naturally moves to the shard where it is needed via read-through replication. +* **Scan-Resistant Eviction:** The SIEVE algorithm handles scan workloads better than LRU and requires less locking overhead on reads. +* **False Sharing Prevention:** Explicit padding ensures locks on different shards reside on different cache lines. + +### Cons + +* **Higher Memory Usage:** Because data is replicated to the local shard on access, the same key-value pair can exist in multiple shards simultaneously. This trades memory capacity for latency. +* **Expensive Removes:** Removing a key is an $O(N)$ operation because it requires acquiring write locks on *all* shards to ensure the value is removed from every replica. +* **Bloom Filter Saturation:** In workloads with very high churn (continuous inserts and deletes over a long period), the shared Bloom filter may eventually saturate (fill with 1s), reducing the effectiveness of negative lookups. +* **Not Ideal for Write-Heavy Workloads:** The overhead of replication and multi-shard consistency checks makes this cache less suitable for write-heavy or delete-heavy scenarios compared to a simple partitioned cache. + +## Example + +Use the `affinities()` builder method to create a cache with one shard per affinity, +ensuring each shard is explicitly associated with a specific affinity (e.g., NUMA node): + +```rust +use thread_aware_cache::{NumaCache, PinnedAffinity}; +use thread_aware::create_manual_pinned_affinities; + +// Create affinities representing 4 NUMA nodes with 1 processor each +let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + +// Create a cache with one shard per affinity +let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10000) + .build(); + +// The cache now has exactly 4 shards, one per affinity +assert_eq!(cache.num_shards(), 4); + +// Insert data from NUMA node 0 +cache.insert(affinities[0], "key".to_string(), 42); + +// Access from node 0 (local hit - fast path) +assert_eq!(cache.get(affinities[0], &"key".to_string()), Some(42)); + +// Access from node 1 (cross-shard, clones locally for future access) +assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); + +// Future accesses from node 1 are now local hits +assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); +``` + +## Cross-Shard Behavior + +The cache automatically handles cross-shard access patterns: + +- **get()**: Checks local shard first, then consults the Bloom filter. If the key might exist, + searches other shards. On remote hit, clones to local. +- **insert()**: Always inserts to the specified affinity's shard and adds the key to the + shared Bloom filter. +- **remove()**: Removes from ALL shards (since values may be replicated). The Bloom filter + retains the key (stale positive), which is safe but may cause unnecessary cross-shard + lookups for removed keys. + +This design is ideal for read-heavy workloads where data naturally becomes "hot" on certain +nodes and should migrate there for optimal NUMA locality. + +## Bloom Filter Optimization + +The cache uses a **lock-free Bloom filter** to optimize cross-shard lookups. Before scanning +all shards for a key that's not in the local shard, the cache consults the Bloom filter: + +- **Negative result (definitely not in cache):** Returns `None` immediately, avoiding + expensive cross-shard scans. This is the fast path for cache misses. +- **Positive result (might be in cache):** Proceeds with cross-shard search. False positives + are possible but safe—they just result in an unnecessary scan. + +### Implementation Details + +| Parameter | Value | Notes | +| :--- | :--- | :--- | +| **Bits per item** | 10 | Provides ~1% false positive rate | +| **Hash functions** | 7 | Optimal for 10 bits/item | +| **Storage** | `AtomicU64` array | Lock-free, cache-friendly | +| **Hash derivation** | Kirsch-Mitzenmacher | Two hashes → k probes via `h1 + i * h2` | + +### Trade-offs + +The Bloom filter does **not** support removal. When a key is removed from the cache, it +remains in the Bloom filter as a "stale positive." This design choice keeps the implementation +simple and lock-free, with the only cost being occasional unnecessary cross-shard scans for +removed keys. For typical cache workloads where reads vastly outnumber removes, this is an +acceptable trade-off. + +### Future Improvement + +For long-running cache instances with high churn (many inserts and removes over time), +stale positives can accumulate and degrade lookup performance. An improved Bloom filter +could address this by tracking insertion locations per bit position, enabling proper removal +support at the cost of increased memory usage (bit per shard instead of 1 bit). + +## SIEVE Algorithm + +SIEVE is superior to LRU because it does not require pointer manipulation on reads, +only a boolean flag update. This makes it particularly efficient in concurrent environments. + +On access, we simply set a `visited` flag to `true`. On eviction, we scan from the "hand" +position, clearing `visited` flags until we find an unvisited entry to evict. + + + +## When to Use + +Use `thread_aware_cache` when you need a concurrent cache on multi-socket (NUMA) hardware +where minimizing cross-node memory access is critical for performance. + +## References + +- [SIEVE: NSDI '24](https://www.usenix.org/conference/nsdi24/presentation/zhang-yazhuo) – + *SIEVE is Simpler than LRU: an Efficient Turn-Key Eviction Algorithm for Web Caches* +- [Swiss Table](https://abseil.io/about/design/swisstables) – Google Abseil `flat_hash_map` +- [False Sharing](https://www.intel.com/content/www/us/en/developer/articles/technical/avoiding-and-identifying-false-sharing-among-threads.html) – Intel Developer Guide on Cache Line definitions (64 bytes) + +

+ +This crate was developed as part of [The Oxidizer Project](https://github.com/microsoft/oxidizer). + +
diff --git a/crates/thread_aware_cache/benches/cache.rs b/crates/thread_aware_cache/benches/cache.rs new file mode 100644 index 00000000..34e9e8c9 --- /dev/null +++ b/crates/thread_aware_cache/benches/cache.rs @@ -0,0 +1,383 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Benchmarks for the NUMA cache. + +#![expect(missing_docs, reason = "Benchmark code does not require documentation")] + +use std::hint::black_box; +use std::sync::Arc; +use std::thread; + +use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use rand::Rng; +use rand::SeedableRng; +use rand::rngs::StdRng; +use thread_aware::create_manual_pinned_affinities; +use thread_aware_cache::NumaCache; + +criterion_group!(benches, bench_basic, bench_concurrent, bench_cross_shard, bench_bloom_filter); +criterion_main!(benches); + +const CACHE_CAPACITY: usize = 10_000; +const NUM_SHARDS: usize = 4; +const TOTAL_CAPACITY: usize = CACHE_CAPACITY * NUM_SHARDS; + +fn bench_basic(c: &mut Criterion) { + let mut group = c.benchmark_group("NumaCache"); + + // Create affinities for benchmarks + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + + // Benchmark cache creation + group.bench_function("new", |b| { + b.iter(|| { + black_box( + NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(), + ) + }); + }); + + // Benchmark single-threaded insert + group.throughput(Throughput::Elements(1)); + group.bench_function("insert_single", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + let affinity = affinities[0]; + + let mut key = 0usize; + b.iter(|| { + cache.insert(affinity, key, key); + key = (key + 1) % TOTAL_CAPACITY; + }); + }); + + // Benchmark single-threaded get (local hit) + group.bench_function("get_local_hit", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + let affinity = affinities[0]; + + // Pre-populate the cache on the same affinity + for i in 0..CACHE_CAPACITY { + cache.insert(affinity, i, i); + } + + let mut rng = StdRng::seed_from_u64(42); + b.iter(|| { + let key = rng.random_range(0..CACHE_CAPACITY); + black_box(cache.get(affinity, &key)); + }); + }); + + // Benchmark single-threaded get (miss) + group.bench_function("get_miss", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + let affinity = affinities[0]; + + let mut key = TOTAL_CAPACITY; // Start outside the range + b.iter(|| { + key += 1; + black_box(cache.get(affinity, &key)); + }); + }); + + // Benchmark eviction + group.bench_function("insert_with_eviction", |b| { + let single_affinity = create_manual_pinned_affinities(&[1]); + b.iter_batched( + || { + let cache = + NumaCache::::builder().affinities(&single_affinity).capacity_per_shard(100).build(); + let affinity = single_affinity[0]; + + // Fill the cache + for i in 0..100 { + cache.insert(affinity, i, i); + } + (cache, affinity) + }, + |(cache, affinity)| { + // Insert causing eviction + cache.insert(affinity, 1000, 1000); + }, + BatchSize::SmallInput, + ); + }); + + group.finish(); +} + +#[expect(clippy::too_many_lines, reason = "Benchmark function naturally groups related concurrent benchmarks")] +fn bench_concurrent(c: &mut Criterion) { + let mut concurrent_group = c.benchmark_group("NumaCache_Concurrent"); + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + + for num_threads in [2, 4, 8] { + concurrent_group.throughput(Throughput::Elements(1000)); + concurrent_group.bench_with_input( + BenchmarkId::new("concurrent_insert", num_threads), + &num_threads, + |b, &num_threads| { + b.iter(|| { + let cache = Arc::new( + NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(), + ); + let affinities = Arc::new(affinities.clone()); + + let handles: Vec<_> = (0..num_threads) + .map(|t| { + let cache = Arc::clone(&cache); + let affinities = Arc::clone(&affinities); + thread::spawn(move || { + let affinity = affinities[t % affinities.len()]; + for i in 0usize..1000 { + let key = t * 10000 + i; + cache.insert(affinity, key, key); + } + }) + }) + .collect(); + + for handle in handles { + handle.join().expect("thread panicked"); + } + }); + }, + ); + + concurrent_group.bench_with_input(BenchmarkId::new("concurrent_read_local", num_threads), &num_threads, |b, &num_threads| { + // Pre-populate the cache - each shard with its own data + let cache = Arc::new( + NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(), + ); + let affinities = Arc::new(affinities.clone()); + + // Each thread's data goes to its assigned shard + for t in 0..NUM_SHARDS { + let affinity = affinities[t]; + for i in 0..CACHE_CAPACITY { + let key = t * CACHE_CAPACITY + i; + cache.insert(affinity, key, key); + } + } + + b.iter(|| { + let handles: Vec<_> = (0..num_threads) + .map(|t| { + let cache = Arc::clone(&cache); + let affinities = Arc::clone(&affinities); + thread::spawn(move || { + let shard_idx = t % NUM_SHARDS; + let affinity = affinities[shard_idx]; + let mut rng = StdRng::seed_from_u64(u64::try_from(t).unwrap_or(0)); + for _ in 0..1000 { + // Read from local shard's key range + let key = shard_idx * CACHE_CAPACITY + rng.random_range(0..CACHE_CAPACITY); + black_box(cache.get(affinity, &key)); + } + }) + }) + .collect(); + + for handle in handles { + handle.join().expect("thread panicked"); + } + }); + }); + + concurrent_group.bench_with_input( + BenchmarkId::new("concurrent_mixed", num_threads), + &num_threads, + |b, &num_threads| { + b.iter(|| { + let cache = Arc::new( + NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(), + ); + let affinities = Arc::new(affinities.clone()); + + let handles: Vec<_> = (0..num_threads) + .map(|t| { + let cache = Arc::clone(&cache); + let affinities = Arc::clone(&affinities); + thread::spawn(move || { + let affinity = affinities[t % affinities.len()]; + let mut rng = StdRng::seed_from_u64(u64::try_from(t).unwrap_or(0)); + for i in 0..1000 { + let key = rng.random_range(0usize..10000); + if i % 10 == 0 { + // 10% writes + cache.insert(affinity, key, key); + } else { + // 90% reads + black_box(cache.get(affinity, &key)); + } + } + }) + }) + .collect(); + + for handle in handles { + handle.join().expect("thread panicked"); + } + }); + }, + ); + } + + concurrent_group.finish(); +} + +/// Benchmark cross-shard access patterns to measure the read-through replication overhead. +fn bench_cross_shard(c: &mut Criterion) { + let mut group = c.benchmark_group("NumaCache_CrossShard"); + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + + // Benchmark cross-shard get (first access - causes replication) + group.bench_function("get_cross_shard_first", |b| { + b.iter_batched( + || { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + + // Insert on shard 0 + cache.insert(affinities[0], 42, 42); + cache + }, + |cache| { + // Get from shard 1 (cross-shard, triggers replication) + black_box(cache.get(affinities[1], &42)); + }, + BatchSize::SmallInput, + ); + }); + + // Benchmark cross-shard get (after replication - local hit) + group.bench_function("get_cross_shard_after_replication", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + + // Insert on shard 0 + cache.insert(affinities[0], 42, 42); + // Trigger replication to shard 1 + let _ = cache.get(affinities[1], &42); + + b.iter(|| { + // Now it's a local hit on shard 1 + black_box(cache.get(affinities[1], &42)); + }); + }); + + // Benchmark worst case: all-miss cross-shard search + group.bench_function("get_all_miss", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + + // Don't insert anything - all gets will search all shards and miss + let mut key = 0usize; + b.iter(|| { + key += 1; + black_box(cache.get(affinities[0], &key)); + }); + }); + + group.finish(); +} + +/// Benchmark the Bloom filter's impact on performance. +fn bench_bloom_filter(c: &mut Criterion) { + let mut group = c.benchmark_group("NumaCache_BloomFilter"); + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + + // Benchmark Bloom filter negative lookup (key never existed) + group.bench_function("bloom_negative_lookup", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + + // Pre-populate with some data (but not the keys we'll query) + let affinity = affinities[0]; + for i in 0..1000 { + cache.insert(affinity, i, i); + } + + // Query keys that were never inserted - Bloom filter should return None fast + let mut key = 100_000usize; + b.iter(|| { + key += 1; + black_box(cache.get(affinity, &key)); + }); + }); + + // Benchmark Bloom filter positive lookup (key exists in another shard) + group.bench_function("bloom_positive_cross_shard", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + + // Insert on shard 0 + for i in 0..1000 { + cache.insert(affinities[0], i, i); + } + + // Query from shard 1 - Bloom filter says "maybe", then we find it cross-shard + let mut rng = StdRng::seed_from_u64(42); + b.iter(|| { + let key = rng.random_range(0..1000); + black_box(cache.get(affinities[1], &key)); + }); + }); + + // Benchmark heavy miss workload (where Bloom filter shines) + group.throughput(Throughput::Elements(1000)); + group.bench_function("heavy_miss_workload", |b| { + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(CACHE_CAPACITY) + .build(); + let affinity = affinities[0]; + + // Insert only 100 items + for i in 0..100 { + cache.insert(affinity, i, i); + } + + // 90% miss rate workload + let mut rng = StdRng::seed_from_u64(42); + b.iter(|| { + for _ in 0..1000 { + let key = rng.random_range(0usize..1000); // Only 100 keys exist + black_box(cache.get(affinity, &key)); + } + }); + }); + + group.finish(); +} diff --git a/crates/thread_aware_cache/src/bloom.rs b/crates/thread_aware_cache/src/bloom.rs new file mode 100644 index 00000000..4124cad7 --- /dev/null +++ b/crates/thread_aware_cache/src/bloom.rs @@ -0,0 +1,283 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Lock-free Bloom filter for fast negative lookups. +//! +//! This module provides a thread-safe Bloom filter implementation using atomic +//! operations. It's used by [`NumaCache`](crate::NumaCache) to quickly determine +//! if a key definitely doesn't exist in any shard, avoiding expensive cross-shard +//! searches for keys that were never inserted. + +use std::hash::{BuildHasher, Hash}; +use std::sync::atomic::{AtomicU64, Ordering}; + +/// A lock-free Bloom filter using atomic bit operations. +/// +/// This implementation uses the Kirsch-Mitzenmacher optimization, which derives +/// multiple hash functions from just two base hashes using the formula: +/// `h_i(x) = h1(x) + i * h2(x)` +/// +/// The filter supports concurrent insertions and queries without locking. +/// It does NOT support removal - once a bit is set, it stays set. This means +/// after removals from the cache, the Bloom filter may have "stale positives" +/// (reporting a key might exist when it doesn't), but it will never have +/// false negatives (reporting a key doesn't exist when it does). +pub(crate) struct BloomFilter { + /// Bit array stored as atomic u64s for lock-free access. + bits: Box<[AtomicU64]>, + /// Number of bits in the filter (always a power of 2 for fast modulo). + num_bits: usize, + /// Bit mask for fast modulo (`num_bits - 1`). + bit_mask: usize, + /// Number of hash functions to use. + num_hashes: u32, + /// Hash builder for computing hashes. + hasher: S, +} + +impl BloomFilter { + /// Creates a new Bloom filter sized for the expected number of items. + /// + /// The filter is sized to achieve approximately 1% false positive rate + /// at the expected capacity. The actual size is rounded up to a power of 2 + /// for efficient bit indexing. + /// + /// # Arguments + /// + /// * `expected_items` - Expected number of unique items to be inserted + /// * `hasher` - Hash builder for computing hashes + pub fn new(expected_items: usize, hasher: S) -> Self { + // Target ~1% false positive rate + // Optimal bits per item ≈ -ln(p) / ln(2)^2 ≈ 9.6 for p=0.01 + // We use 10 bits per item for simplicity + let bits_needed = expected_items.saturating_mul(10).max(64); + + // Round up to next power of 2 for fast modulo + let num_bits = bits_needed.next_power_of_two(); + let num_u64s = num_bits.div_ceil(64); + + // Optimal number of hash functions ≈ (m/n) * ln(2) ≈ 0.693 * bits_per_item + // For 10 bits per item: ~7 hash functions + let num_hashes = 7; + + let bits: Vec = (0..num_u64s).map(|_| AtomicU64::new(0)).collect(); + + Self { + bits: bits.into_boxed_slice(), + num_bits, + bit_mask: num_bits - 1, + num_hashes, + hasher, + } + } + + /// Inserts a key into the Bloom filter. + /// + /// This operation is lock-free and thread-safe. + pub fn insert(&self, key: &K) { + let (h1, h2) = self.compute_hashes(key); + + for i in 0..self.num_hashes { + let bit_index = self.get_bit_index(h1, h2, i); + self.set_bit(bit_index); + } + } + + /// Checks if a key might be in the filter. + /// + /// Returns: + /// - `false` if the key is definitely NOT in the set (no false negatives) + /// - `true` if the key MIGHT be in the set (possible false positives) + /// + /// This operation is lock-free and thread-safe. + #[must_use] + pub fn might_contain(&self, key: &K) -> bool { + let (h1, h2) = self.compute_hashes(key); + + for i in 0..self.num_hashes { + let bit_index = self.get_bit_index(h1, h2, i); + if !self.get_bit(bit_index) { + return false; + } + } + true + } + + /// Computes two independent hashes for Kirsch-Mitzenmacher optimization. + fn compute_hashes(&self, key: &K) -> (u64, u64) { + let hash = self.hasher.hash_one(key); + + // Split the 64-bit hash into two 32-bit values by rotating + // This provides better independence than deriving h2 from h1 + let h1 = hash; + let h2 = hash.rotate_left(32); + + (h1, h2) + } + + /// Gets the bit index for the i-th hash function using Kirsch-Mitzenmacher. + #[inline] + #[expect(clippy::cast_possible_truncation, reason = "bit_mask ensures result fits in usize")] + fn get_bit_index(&self, h1: u64, h2: u64, i: u32) -> usize { + let combined = h1.wrapping_add(h2.wrapping_mul(u64::from(i))); + (combined as usize) & self.bit_mask + } + + /// Sets a bit at the given index using atomic OR. + #[inline] + fn set_bit(&self, bit_index: usize) { + let word_index = bit_index / 64; + let bit_offset = bit_index % 64; + let mask = 1u64 << bit_offset; + + // Relaxed ordering is sufficient - we only care about eventual visibility + // and the filter is tolerant of concurrent races (worst case: duplicate sets) + self.bits[word_index].fetch_or(mask, Ordering::Relaxed); + } + + /// Gets a bit at the given index. + #[inline] + fn get_bit(&self, bit_index: usize) -> bool { + let word_index = bit_index / 64; + let bit_offset = bit_index % 64; + let mask = 1u64 << bit_offset; + + // Relaxed ordering is sufficient - false positives are acceptable, + // and we never want false negatives (which can't happen with relaxed + // reads since bits are only ever set, never cleared) + (self.bits[word_index].load(Ordering::Relaxed) & mask) != 0 + } + + /// Returns the size of the filter in bits. + #[cfg(test)] + #[must_use] + pub fn size_bits(&self) -> usize { + self.num_bits + } +} + +impl std::fmt::Debug for BloomFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BloomFilter") + .field("size_bits", &self.num_bits) + .field("num_hashes", &self.num_hashes) + .finish_non_exhaustive() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use hashbrown::DefaultHashBuilder; + + #[test] + fn test_bloom_filter_basic() { + let filter: BloomFilter = BloomFilter::new(1000, DefaultHashBuilder::default()); + + // Insert some keys + filter.insert(&"hello"); + filter.insert(&"world"); + filter.insert(&42i32); + + // Inserted keys should be found + assert!(filter.might_contain(&"hello")); + assert!(filter.might_contain(&"world")); + assert!(filter.might_contain(&42i32)); + + // Non-inserted keys should (probably) not be found + // Note: There's a small chance of false positives, but with 1000 capacity + // and only 3 insertions, it's extremely unlikely + assert!(!filter.might_contain(&"goodbye")); + assert!(!filter.might_contain(&"universe")); + assert!(!filter.might_contain(&999i32)); + } + + #[test] + fn test_bloom_filter_no_false_negatives() { + let filter: BloomFilter = BloomFilter::new(10000, DefaultHashBuilder::default()); + + // Insert many keys + for i in 0..1000 { + filter.insert(&i); + } + + // All inserted keys MUST be found (no false negatives allowed) + for i in 0..1000 { + assert!(filter.might_contain(&i), "key {i} should be found"); + } + } + + #[test] + fn test_bloom_filter_false_positive_rate() { + let filter: BloomFilter = BloomFilter::new(10000, DefaultHashBuilder::default()); + + // Insert 10000 keys + for i in 0..10000 { + filter.insert(&i); + } + + // Check 10000 keys that were NOT inserted + let mut false_positives = 0; + for i in 10000..20000 { + if filter.might_contain(&i) { + false_positives += 1; + } + } + + // With 10 bits per item and 7 hash functions, we expect ~1% FP rate + // Allow up to 3% to account for variance + let fp_rate = f64::from(false_positives) / 10000.0; + assert!( + fp_rate < 0.03, + "false positive rate {:.2}% is too high (expected < 3%)", + fp_rate * 100.0 + ); + } + + #[test] + fn test_bloom_filter_concurrent() { + use std::sync::Arc; + use std::thread; + + let filter = Arc::new(BloomFilter::::new(100_000, DefaultHashBuilder::default())); + + let mut handles = vec![]; + + // Spawn multiple writer threads + for t in 0..4 { + let filter = Arc::clone(&filter); + handles.push(thread::spawn(move || { + for i in 0..10000 { + let key = t * 100_000 + i; + filter.insert(&key); + } + })); + } + + // Wait for all writers + for handle in handles { + handle.join().expect("thread panicked"); + } + + // Verify all inserted keys are found + for t in 0..4 { + for i in 0..10000 { + let key = t * 100_000 + i; + assert!(filter.might_contain(&key), "key {key} should be found"); + } + } + } + + #[test] + fn test_bloom_filter_sizing() { + // Small filter + let small: BloomFilter = BloomFilter::new(100, DefaultHashBuilder::default()); + assert!(small.size_bits() >= 1000); // At least 10 bits per item + assert!(small.size_bits().is_power_of_two()); + + // Large filter + let large: BloomFilter = BloomFilter::new(1_000_000, DefaultHashBuilder::default()); + assert!(large.size_bits() >= 10_000_000); + assert!(large.size_bits().is_power_of_two()); + } +} diff --git a/crates/thread_aware_cache/src/cache.rs b/crates/thread_aware_cache/src/cache.rs new file mode 100644 index 00000000..49b9a217 --- /dev/null +++ b/crates/thread_aware_cache/src/cache.rs @@ -0,0 +1,691 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! NUMA-aware cache implementation. +//! +//! This module provides the main [`NumaCache`] type and its builder. + +use std::hash::Hash; +use std::sync::Arc; + +use hashbrown::DefaultHashBuilder; +use thread_aware::ThreadRegistry; + +use crate::bloom::BloomFilter; +use crate::shard::NumaShard; + +/// A high-performance, NUMA-aware in-memory cache with SIEVE eviction. +/// +/// The cache partitions data across multiple shards, with routing based on +/// thread affinity to minimize cross-NUMA traffic. Each shard is explicitly +/// associated with a [`thread_aware::PinnedAffinity`]. +/// +/// A shared Bloom filter optimizes cross-shard lookups by quickly identifying +/// keys that definitely don't exist in any shard, avoiding expensive O(n) searches. +/// +/// # Type Parameters +/// +/// * `K` - The key type, must implement `Eq + Hash + Clone`. +/// * `V` - The value type, must implement `Clone`. +/// * `S` - The hash builder type, defaults to `DefaultHashBuilder`. +/// +/// # Examples +/// +/// ``` +/// use thread_aware_cache::NumaCache; +/// use thread_aware::create_manual_pinned_affinities; +/// +/// let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); +/// let cache = NumaCache::::builder() +/// .affinities(&affinities) +/// .capacity_per_shard(1000) +/// .build(); +/// +/// cache.insert(affinities[0], "hello".to_string(), 42); +/// assert_eq!(cache.get(affinities[0], &"hello".to_string()), Some(42)); +/// ``` +pub struct NumaCache { + /// The shards, one per affinity. + shards: Arc<[NumaShard]>, + /// The affinities corresponding to each shard. + affinities: Arc<[thread_aware::PinnedAffinity]>, + /// Shared Bloom filter for fast negative lookups across all shards. + bloom_filter: Arc>, +} + +impl std::fmt::Debug for NumaCache +where + K: Eq + Hash + Clone, + V: Clone, + S: std::hash::BuildHasher, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NumaCache") + .field("num_shards", &self.shards.len()) + .field("shards", &self.shards) + .field("bloom_filter", &self.bloom_filter) + .finish_non_exhaustive() + } +} + +impl NumaCache +where + K: Eq + Hash + Clone, + V: Clone, +{ + /// Creates a new builder for configuring a `NumaCache`. + #[must_use] + pub fn builder() -> NumaCacheBuilder { + NumaCacheBuilder::new() + } +} + +impl NumaCache +where + K: Eq + Hash + Clone + Send + Sync, + V: Clone + Send + Sync, + S: std::hash::BuildHasher + Default + Send + Sync, +{ + /// Creates a new cache with the specified shards, affinities, and Bloom filter. + fn from_shards( + shards: Vec>, + affinities: Vec, + bloom_filter: BloomFilter, + ) -> Self { + debug_assert_eq!(shards.len(), affinities.len(), "shards and affinities must have the same length"); + Self { + shards: shards.into(), + affinities: affinities.into(), + bloom_filter: Arc::new(bloom_filter), + } + } + + /// Returns the number of shards (one per affinity). + #[must_use] + pub fn num_shards(&self) -> usize { + self.shards.len() + } + + /// Returns the affinities associated with this cache. + /// + /// Each affinity corresponds to a shard at the same index. + #[must_use] + pub fn affinities(&self) -> &[thread_aware::PinnedAffinity] { + &self.affinities + } + + /// Returns the total number of entries across all shards. + /// + /// Note: This requires acquiring read locks on all shards, so it may not + /// be suitable for high-frequency calls. + #[must_use] + pub fn len(&self) -> usize { + self.shards.iter().map(NumaShard::len).sum() + } + + /// Returns `true` if the cache is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.shards.iter().all(NumaShard::is_empty) + } + + /// Returns the total capacity across all shards. + #[must_use] + pub fn capacity(&self) -> usize { + self.shards.iter().map(NumaShard::capacity).sum() + } + + /// Clears all entries from the cache. + pub fn clear(&self) { + for shard in self.shards.iter() { + shard.clear(); + } + } + + /// Gets a reference to a specific shard by index. + /// + /// # Panics + /// + /// Panics if `index >= num_shards()`. + #[must_use] + pub fn shard(&self, index: usize) -> &NumaShard { + &self.shards[index] + } + + /// Returns the shard for the given affinity. + /// + /// This method looks up the shard that was explicitly associated with + /// the given affinity during cache construction. + fn select_shard_for_affinity(&self, affinity: thread_aware::PinnedAffinity) -> &NumaShard { + // Find the shard index by looking up the affinity in our stored affinities + let shard_index = self + .affinities + .iter() + .position(|a| *a == affinity) + .unwrap_or_else(|| { + // Fallback to memory region index if affinity not found + affinity.memory_region_index() % self.shards.len() + }); + &self.shards[shard_index] + } + + /// Looks up a key in the cache using affinity-based shard selection. + /// + /// This method first checks the local shard (associated with the given affinity) + /// for maximum NUMA locality. If the key is not found locally, it consults the + /// shared Bloom filter before searching other shards: + /// + /// - If the Bloom filter says the key definitely doesn't exist, return `None` immediately + /// - If the Bloom filter says the key might exist, search other shards + /// + /// When found in a remote shard, the value is automatically cloned to the local + /// shard, promoting future NUMA-local access. + /// + /// This "read-through with local caching" approach provides: + /// - Fast path: O(1) lookup when data is already local + /// - Bloom filter optimization: O(1) for definite misses (no cross-shard search) + /// - Automatic locality promotion: Hot data migrates to where it's accessed + /// - Shard-local eviction: Each shard independently manages its capacity + /// + /// # Examples + /// + /// ``` + /// use thread_aware_cache::NumaCache; + /// use thread_aware::create_manual_pinned_affinities; + /// + /// let affinities = create_manual_pinned_affinities(&[1, 1]); + /// let cache = NumaCache::::builder() + /// .affinities(&affinities) + /// .build(); + /// + /// // Insert on shard 0 + /// cache.insert(affinities[0], "key".to_string(), 42); + /// + /// // Get from shard 0 (local hit) + /// assert_eq!(cache.get(affinities[0], &"key".to_string()), Some(42)); + /// + /// // Get from shard 1 (cross-shard clone) + /// assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); + /// + /// // Now the value is also in shard 1 (future accesses are local) + /// ``` + #[must_use] + pub fn get(&self, affinity: thread_aware::PinnedAffinity, key: &K) -> Option { + let local_shard = self.select_shard_for_affinity(affinity); + + // Fast path: check local shard first (NUMA-local access) + if let Some(value) = local_shard.get(key) { + return Some(value); + } + + // Bloom filter check: if key definitely doesn't exist anywhere, skip cross-shard search + if !self.bloom_filter.might_contain(key) { + return None; + } + + // Slow path: search other shards for the key + for shard in self.shards.iter() { + // Skip the local shard (already checked) + if std::ptr::eq(shard, local_shard) { + continue; + } + + if let Some(value) = shard.get(key) { + // Found in remote shard - clone to local shard for NUMA locality + // This promotes hot data to be local to where it's being accessed + local_shard.insert(key.clone(), value.clone()); + return Some(value); + } + } + + None + } + + /// Inserts a key-value pair using affinity-based shard selection. + /// + /// This method routes the insertion to the shard corresponding to the given + /// affinity, ensuring data is stored locally to that affinity's NUMA node. + /// The key is also added to the shared Bloom filter for cross-shard lookup + /// optimization. + /// + /// Returns the previous value if the key already existed. + /// + /// # Examples + /// + /// ``` + /// use thread_aware_cache::NumaCache; + /// use thread_aware::create_manual_pinned_affinities; + /// + /// let affinities = create_manual_pinned_affinities(&[1, 1]); + /// let cache = NumaCache::::builder() + /// .affinities(&affinities) + /// .build(); + /// assert!(cache.insert(affinities[0], "key".to_string(), 42).is_none()); + /// assert_eq!(cache.insert(affinities[0], "key".to_string(), 100), Some(42)); + /// ``` + pub fn insert(&self, affinity: thread_aware::PinnedAffinity, key: K, value: V) -> Option { + // Add to Bloom filter for cross-shard lookup optimization + self.bloom_filter.insert(&key); + self.select_shard_for_affinity(affinity).insert(key, value) + } + + /// Removes a key from the cache. + /// + /// Since values may be replicated across multiple shards (due to cross-shard + /// gets promoting data locality), this method removes the key from ALL shards + /// where it exists, returning the value from the first shard that contained it. + /// + /// # Examples + /// + /// ``` + /// use thread_aware_cache::NumaCache; + /// use thread_aware::create_manual_pinned_affinities; + /// + /// let affinities = create_manual_pinned_affinities(&[1, 1]); + /// let cache = NumaCache::::builder() + /// .affinities(&affinities) + /// .build(); + /// + /// // Insert on shard 0 + /// cache.insert(affinities[0], "key".to_string(), 42); + /// + /// // Access from shard 1 (clones to shard 1) + /// let _ = cache.get(affinities[1], &"key".to_string()); + /// + /// // Remove - clears from ALL shards + /// assert_eq!(cache.remove(affinities[0], &"key".to_string()), Some(42)); + /// + /// // Key is gone from both shards + /// assert!(cache.get(affinities[0], &"key".to_string()).is_none()); + /// assert!(cache.get(affinities[1], &"key".to_string()).is_none()); + /// ``` + pub fn remove(&self, affinity: thread_aware::PinnedAffinity, key: &K) -> Option { + let local_shard = self.select_shard_for_affinity(affinity); + let mut result = local_shard.remove(key); + + // Remove from all other shards as well (value may have been replicated) + for shard in self.shards.iter() { + if std::ptr::eq(shard, local_shard) { + continue; + } + if let Some(value) = shard.remove(key) { + // Keep the first value we found if we haven't found one yet + if result.is_none() { + result = Some(value); + } + } + } + + result + } + + /// Returns the shard index for a given affinity. + /// + /// This looks up the affinity in the cache's stored affinities to find + /// the corresponding shard index. + #[must_use] + pub fn shard_index_for_affinity(&self, affinity: thread_aware::PinnedAffinity) -> usize { + self.affinities + .iter() + .position(|a| *a == affinity) + .unwrap_or_else(|| { + // Fallback to memory region index if affinity not found + affinity.memory_region_index() % self.shards.len() + }) + } +} + +impl Clone for NumaCache { + fn clone(&self) -> Self { + Self { + shards: Arc::clone(&self.shards), + affinities: Arc::clone(&self.affinities), + bloom_filter: Arc::clone(&self.bloom_filter), + } + } +} + +// Safety: NumaCache is Send if K, V, and S are Send + Sync +// The shards use RwLock internally which provides the synchronization +unsafe impl Send for NumaCache +where + K: Send + Sync, + V: Send + Sync, + S: Send + Sync, +{ +} + +// Safety: NumaCache is Sync if K, V, and S are Send + Sync +// The shards use RwLock internally which provides the synchronization +unsafe impl Sync for NumaCache +where + K: Send + Sync, + V: Send + Sync, + S: Send + Sync, +{ +} + +/// Builder for configuring a [`NumaCache`]. +/// +/// # Examples +/// +/// ``` +/// use thread_aware_cache::NumaCache; +/// use thread_aware::create_manual_pinned_affinities; +/// +/// let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); +/// let cache = NumaCache::::builder() +/// .affinities(&affinities) +/// .capacity_per_shard(10000) +/// .build(); +/// ``` +#[derive(Debug)] +pub struct NumaCacheBuilder { + affinities: Vec, + capacity_per_shard: usize, + registry: Option>, + _marker: std::marker::PhantomData<(K, V, S)>, +} + +impl Default for NumaCacheBuilder { + fn default() -> Self { + Self::new() + } +} + +impl NumaCacheBuilder { + /// Creates a new builder with default settings. + /// + /// You must call [`affinities()`](Self::affinities) before [`build()`](Self::build) + /// to specify which affinities the cache shards will be associated with. + /// + /// Defaults: + /// - `capacity_per_shard`: 1024 + #[must_use] + pub fn new() -> Self { + Self { + affinities: Vec::new(), + capacity_per_shard: 1024, + registry: None, + _marker: std::marker::PhantomData, + } + } + + /// Sets the affinities for the cache shards. + /// + /// Each affinity corresponds to a shard. The number of shards will equal + /// the number of affinities provided. This ensures that each shard is + /// associated with a specific affinity (e.g., NUMA node), enabling true + /// NUMA-local access patterns. + /// + /// # Examples + /// + /// ``` + /// use thread_aware_cache::NumaCache; + /// use thread_aware::create_manual_pinned_affinities; + /// + /// // Create 4 affinities representing 4 NUMA nodes + /// let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + /// + /// let cache = NumaCache::::builder() + /// .affinities(&affinities) + /// .capacity_per_shard(10000) + /// .build(); + /// + /// // Now the cache has exactly 4 shards, one per affinity + /// assert_eq!(cache.num_shards(), 4); + /// ``` + #[must_use] + pub fn affinities(mut self, affinities: &[thread_aware::PinnedAffinity]) -> Self { + self.affinities = affinities.to_vec(); + self + } + + /// Sets the capacity per shard. + #[must_use] + pub const fn capacity_per_shard(mut self, capacity: usize) -> Self { + self.capacity_per_shard = capacity; + self + } + + /// Sets the thread registry for NUMA-aware memory allocation. + /// + /// When a registry is provided, each shard will be allocated while the + /// current thread is pinned to the corresponding affinity. This leverages + /// the OS's first-touch memory policy to ensure shard data is allocated + /// on the correct NUMA node. + /// + /// If no registry is provided, shards are allocated on whatever NUMA node + /// the builder thread happens to be running on, which may not be optimal. + /// + /// # Examples + /// + /// ```ignore + /// use std::sync::Arc; + /// use thread_aware_cache::NumaCache; + /// use thread_aware::{ThreadRegistry, ProcessorCount}; + /// + /// let registry = Arc::new(ThreadRegistry::new(&ProcessorCount::All)); + /// let affinities: Vec<_> = registry.affinities().collect(); + /// + /// let cache = NumaCache::::builder() + /// .affinities(&affinities) + /// .registry(Arc::clone(®istry)) + /// .capacity_per_shard(10000) + /// .build(); + /// ``` + #[must_use] + pub fn registry(mut self, registry: Arc) -> Self { + self.registry = Some(registry); + self + } +} + +impl NumaCacheBuilder +where + K: Eq + Hash + Clone + Send + Sync, + V: Clone + Send + Sync, + S: std::hash::BuildHasher + Default + Clone + Send + Sync, +{ + /// Builds the cache with the configured settings. + /// + /// The cache will have one shard per affinity, with each shard associated + /// with its corresponding affinity. A shared Bloom filter is created sized + /// for the total capacity across all shards. + /// + /// If a [`ThreadRegistry`] was provided via [`registry()`](Self::registry), + /// each shard will be allocated while pinned to its corresponding affinity, + /// ensuring NUMA-local memory allocation via the OS's first-touch policy. + /// + /// # Panics + /// + /// Panics if no affinities have been set via [`affinities()`](Self::affinities). + #[must_use] + pub fn build(self) -> NumaCache { + assert!(!self.affinities.is_empty(), "affinities must be set before building the cache"); + + let num_shards = self.affinities.len(); + let total_capacity = num_shards * self.capacity_per_shard; + + let shards: Vec> = match &self.registry { + Some(registry) => { + // NUMA-aware allocation: allocate each shard while pinned to its affinity. + // This leverages the OS's first-touch memory policy to ensure shard data + // is allocated on the correct NUMA node. + self.affinities + .iter() + .map(|affinity| { + registry.pin_to(*affinity); + NumaShard::new(self.capacity_per_shard) + }) + .collect() + } + None => { + // Non-NUMA-aware allocation: all shards allocated on current thread's node + self.affinities.iter().map(|_| NumaShard::new(self.capacity_per_shard)).collect() + } + }; + + // Create Bloom filter sized for total capacity across all shards + let bloom_filter = BloomFilter::new(total_capacity, S::default()); + + NumaCache::from_shards(shards, self.affinities, bloom_filter) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use thread_aware::create_manual_pinned_affinities; + + #[test] + fn test_cache_builder_custom() { + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + assert_eq!(cache.num_shards(), 4); + assert_eq!(cache.capacity(), 400); + } + + #[test] + fn test_cache_basic_operations() { + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10) + .build(); + + assert!(cache.is_empty()); + + // Insert + assert!(cache.insert(affinities[0], "key1".to_string(), 100).is_none()); + assert!(!cache.is_empty()); + assert_eq!(cache.len(), 1); + + // Get + assert_eq!(cache.get(affinities[0], &"key1".to_string()), Some(100)); + assert!(cache.get(affinities[0], &"nonexistent".to_string()).is_none()); + + // Update + assert_eq!(cache.insert(affinities[0], "key1".to_string(), 200), Some(100)); + assert_eq!(cache.get(affinities[0], &"key1".to_string()), Some(200)); + + // Remove + assert_eq!(cache.remove(affinities[0], &"key1".to_string()), Some(200)); + assert!(cache.get(affinities[0], &"key1".to_string()).is_none()); + + // Clear + cache.insert(affinities[0], "a".to_string(), 1); + cache.insert(affinities[1], "b".to_string(), 2); + cache.clear(); + assert!(cache.is_empty()); + } + + #[test] + fn test_cache_clone() { + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10) + .build(); + + cache.insert(affinities[0], "key".to_string(), 42); + + let cache_clone = cache.clone(); + + // Both should see the same data (shared Arc) + assert_eq!(cache_clone.get(affinities[0], &"key".to_string()), Some(42)); + + // Modifications through one should be visible through the other + cache_clone.insert(affinities[1], "key2".to_string(), 100); + assert_eq!(cache.get(affinities[1], &"key2".to_string()), Some(100)); + } + + #[test] + fn test_cache_send_sync() { + fn assert_send_sync() {} + assert_send_sync::>(); + } + + #[test] + fn test_affinity_shard_selection() { + // Create 4 NUMA nodes with 1 processor each + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + // Each affinity should map to a different shard + let mut shard_indices = [ + cache.shard_index_for_affinity(affinities[0]), + cache.shard_index_for_affinity(affinities[1]), + cache.shard_index_for_affinity(affinities[2]), + cache.shard_index_for_affinity(affinities[3]), + ]; + + // With 4 affinities from 4 different NUMA nodes and 4 shards, each should map to a unique shard + shard_indices.sort_unstable(); + assert_eq!(shard_indices, [0, 1, 2, 3]); + } + + #[test] + fn test_affinity_based_operations() { + // Create 2 NUMA nodes with 1 processor each + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + let affinity0 = affinities[0]; + let affinity1 = affinities[1]; + + // Insert with affinity0 + assert!(cache.insert(affinity0, "key0".to_string(), 100).is_none()); + + // Insert with affinity1 + assert!(cache.insert(affinity1, "key1".to_string(), 200).is_none()); + + // Get with correct affinities + assert_eq!(cache.get(affinity0, &"key0".to_string()), Some(100)); + assert_eq!(cache.get(affinity1, &"key1".to_string()), Some(200)); + + // Remove with affinity + assert_eq!(cache.remove(affinity0, &"key0".to_string()), Some(100)); + assert!(cache.get(affinity0, &"key0".to_string()).is_none()); + } + + #[test] + fn test_affinity_locality() { + // Create 4 NUMA nodes with 1 processor each + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + // Insert data using specific affinities + for (i, &affinity) in affinities.iter().enumerate() { + for j in 0u64..10 { + let key = (i as u64) * 100 + j; + cache.insert(affinity, key, key * 10); + } + } + + // Verify data is in the expected shards + for (i, &affinity) in affinities.iter().enumerate() { + let shard_idx = cache.shard_index_for_affinity(affinity); + let shard = cache.shard(shard_idx); + + // Data inserted with this affinity should be in this shard + for j in 0u64..10 { + let key = (i as u64) * 100 + j; + assert_eq!(shard.get(&key), Some(key * 10), "key {key} should be in shard {shard_idx}"); + } + } + } +} diff --git a/crates/thread_aware_cache/src/lib.rs b/crates/thread_aware_cache/src/lib.rs new file mode 100644 index 00000000..6657a4bc --- /dev/null +++ b/crates/thread_aware_cache/src/lib.rs @@ -0,0 +1,136 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] + +//! A high-performance, NUMA-aware in-memory cache with SIEVE eviction. +//! +//! This crate provides [`NumaCache`], a cache designed for high read-throughput and low-latency +//! on multi-socket architectures. It combines several techniques: +//! +//! 1. **Topology-Aware Sharding:** Data is partitioned by physical NUMA nodes to minimize +//! QPI/UPI interconnect traffic. +//! 2. **Swiss Table Storage:** Utilizes [`hashbrown`] for SIMD-accelerated lookups. +//! 3. **SIEVE Eviction Policy:** A scan-resistant, efficient eviction algorithm that outperforms +//! LRU in concurrent environments by minimizing metadata writes on reads. +//! 4. **Thread-Aware Integration:** Built on top of [`thread_aware`] for true NUMA locality +//! via [`PinnedAffinity`]-based shard routing. +//! 5. **Read-Through Replication:** Automatic cross-shard cloning promotes hot data to the +//! local shard, improving locality for subsequent accesses. +//! 6. **NUMA-Aware Memory Allocation:** When a [`ThreadRegistry`] is provided, shard memory +//! is allocated while pinned to the correct NUMA node via first-touch policy. +//! +//! # Architecture +//! +//! The cache operates on a **Shared-Nothing-per-Node** model with **automatic locality promotion**. +//! Rather than sharding by key hash (which causes random memory access across sockets), +//! we shard by **Thread Affinity**. +//! +//! When a key is requested from a shard where it doesn't exist, but exists in another shard, +//! the value is automatically cloned to the local shard. This "read-through with local caching" +//! approach ensures: +//! +//! - **Fast path:** Local lookups are O(1) with zero interconnect traffic +//! - **Automatic locality:** Hot data migrates to where it's being accessed +//! - **Independent eviction:** Each shard manages its own capacity via SIEVE +//! +//! Each shard is cache-line aligned (64 bytes) to prevent false sharing between locks. +//! +//! # Performance Characteristics +//! +//! | Metric | Complexity | Notes | +//! | :--- | :--- | :--- | +//! | **Lookup (Local Hit)** | $O(1)$ | Zero-interconnect traffic. | +//! | **Lookup (Remote Hit)** | $O(n)$ shards | Clones to local shard for future O(1) access. | +//! | **Insertion** | Amortized $O(1)$ | Includes potential eviction scan. | +//! | **Eviction** | Amortized $O(1)$ | SIEVE hand movement is minimal in practice. | +//! | **Removal** | $O(n)$ shards | Removes from all shards (due to replication). | +//! | **Concurrency** | Sharded `RwLock` | No false sharing due to explicit padding. | +//! +//! # Example +//! +//! Use the `affinities()` builder method to create a cache with one shard per affinity, +//! ensuring each shard is explicitly associated with a specific affinity (e.g., NUMA node): +//! +//! ``` +//! use thread_aware_cache::{NumaCache, PinnedAffinity}; +//! use thread_aware::create_manual_pinned_affinities; +//! +//! // Create affinities representing 4 NUMA nodes with 1 processor each +//! let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); +//! +//! // Create a cache with one shard per affinity +//! let cache = NumaCache::::builder() +//! .affinities(&affinities) +//! .capacity_per_shard(10000) +//! .build(); +//! +//! // The cache now has exactly 4 shards, one per affinity +//! assert_eq!(cache.num_shards(), 4); +//! +//! // Insert data from NUMA node 0 +//! cache.insert(affinities[0], "key".to_string(), 42); +//! +//! // Access from node 0 (local hit - fast path) +//! assert_eq!(cache.get(affinities[0], &"key".to_string()), Some(42)); +//! +//! // Access from node 1 (cross-shard, clones locally for future access) +//! assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); +//! +//! // Future accesses from node 1 are now local hits +//! assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); +//! ``` +//! +//! # Cross-Shard Behavior +//! +//! The cache automatically handles cross-shard access patterns: +//! +//! - **`get()`**: Checks local shard first, then searches others. On remote hit, clones to local. +//! - **`insert()`**: Always inserts to the specified affinity's shard. +//! - **`remove()`**: Removes from ALL shards (since values may be replicated). +//! +//! This design is ideal for read-heavy workloads where data naturally becomes "hot" on certain +//! nodes and should migrate there for optimal NUMA locality. +//! +//! # SIEVE Algorithm +//! +//! SIEVE is superior to LRU because it does not require pointer manipulation on reads, +//! only a boolean flag update. This makes it particularly efficient in concurrent environments. +//! +//! On access, we simply set a `visited` flag to `true`. On eviction, we scan from the "hand" +//! position, clearing `visited` flags until we find an unvisited entry to evict. +//! +//! # Bloom Filter Optimization +//! +//! The cache uses a shared lock-free Bloom filter to optimize cross-shard lookups. Before +//! searching all shards for a key, the cache queries the Bloom filter: +//! +//! - **Negative result:** Key definitely doesn't exist → skip cross-shard search (O(1)) +//! - **Positive result:** Key might exist → proceed with cross-shard search +//! +//! The Bloom filter is sized for ~1% false positive rate and uses atomic operations for +//! lock-free concurrent access. Note that removals don't clear bits from the filter, so +//! stale positives may occur (safe, just slower), but false negatives never occur. +//! +//! # References +//! +//! 1. **Swiss Table:** Google Abseil `flat_hash_map`. +//! 2. **SIEVE:** *SIEVE is Simpler than LRU: an Efficient Turn-Key Eviction Algorithm for Web +//! Caches* (NSDI '24). +//! 3. **False Sharing:** Intel Developer Guide on Cache Line definitions (64 bytes). +//! 4. **NUMA Replication:** Similar to OS page migration strategies for hot data. +//! 5. **Bloom Filter:** Kirsch-Mitzenmacher optimization for deriving multiple hash functions. + +mod bloom; +mod cache; +mod shard; +mod sieve; + +pub use cache::{NumaCache, NumaCacheBuilder}; + +// Re-export thread_aware types for convenience +pub use thread_aware::{PinnedAffinity, ProcessorCount, ThreadRegistry}; + +#[cfg(test)] +mod tests; diff --git a/crates/thread_aware_cache/src/shard.rs b/crates/thread_aware_cache/src/shard.rs new file mode 100644 index 00000000..4d6f7369 --- /dev/null +++ b/crates/thread_aware_cache/src/shard.rs @@ -0,0 +1,266 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Cache shard implementation. +//! +//! Each shard contains a Swiss Table (`hashbrown::HashMap`) for storage and SIEVE metadata +//! for eviction. Shards are cache-line aligned to prevent false sharing. + +use std::hash::{BuildHasher, Hash}; + +use hashbrown::DefaultHashBuilder; +use hashbrown::HashMap; +use parking_lot::RwLock; + +use crate::sieve::{NodeIndex, SieveList}; + +/// Cache line size for alignment to prevent false sharing. +const CACHE_LINE_SIZE: usize = 64; + +/// A single cache shard containing data and eviction metadata. +/// +/// Aligned to the CPU cache line (64 bytes) to prevent cache-line bouncing between locks. +#[repr(align(64))] +pub struct NumaShard { + /// The protected inner state. + inner: RwLock>, + /// Explicit padding to ensure the lock of the next shard resides on a different cache line. + _pad: [u8; CACHE_LINE_SIZE], +} + +impl NumaShard { + /// Creates a new shard with the given capacity. + #[must_use] + pub fn new(capacity: usize) -> Self { + Self { + inner: RwLock::new(ShardInner::new(capacity)), + _pad: [0; CACHE_LINE_SIZE], + } + } +} + +impl NumaShard +where + K: Eq + Hash + Clone, + V: Clone, + S: BuildHasher, +{ + /// Looks up a key in the shard. + /// + /// Returns `Some(value)` if found, marking the entry as visited for SIEVE. + pub fn get(&self, key: &K) -> Option { + let inner = self.inner.read(); + inner.get(key) + } + + /// Inserts a key-value pair into the shard. + /// + /// If the shard is at capacity, performs SIEVE eviction first. + /// Returns the previous value if the key already existed. + pub fn insert(&self, key: K, value: V) -> Option { + let mut inner = self.inner.write(); + inner.insert(key, value) + } + + /// Removes a key from the shard. + /// + /// Returns the previous value if the key existed. + pub fn remove(&self, key: &K) -> Option { + let mut inner = self.inner.write(); + inner.remove(key) + } + + /// Returns the number of entries in the shard. + #[must_use] + pub fn len(&self) -> usize { + let inner = self.inner.read(); + inner.map.len() + } + + /// Returns `true` if the shard is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the capacity of the shard. + #[must_use] + pub fn capacity(&self) -> usize { + let inner = self.inner.read(); + inner.sieve.capacity() + } + + /// Clears all entries from the shard. + pub fn clear(&self) { + let mut inner = self.inner.write(); + inner.clear(); + } +} + +impl std::fmt::Debug for NumaShard +where + K: Eq + std::hash::Hash + Clone, + V: Clone, + S: std::hash::BuildHasher, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NumaShard") + .field("len", &self.len()) + .field("capacity", &self.capacity()) + .finish() + } +} + +/// Inner state of a cache shard. +struct ShardInner { + /// The primary storage using Swiss Table for SIMD-accelerated lookup. + map: HashMap, S>, + + /// SIEVE eviction state. + sieve: SieveList, +} + +impl ShardInner { + /// Creates a new shard inner with the given capacity. + fn new(capacity: usize) -> Self { + Self { + map: HashMap::with_capacity_and_hasher(capacity, S::default()), + sieve: SieveList::new(capacity), + } + } +} + +impl ShardInner +where + K: Eq + Hash + Clone, + V: Clone, + S: BuildHasher, +{ + /// Looks up a key in the shard. + fn get(&self, key: &K) -> Option { + let entry = self.map.get(key)?; + // Mark as visited for SIEVE (relaxed ordering is fine here) + self.sieve.mark_visited(entry.sieve_index); + Some(entry.value.clone()) + } + + /// Inserts a key-value pair. + fn insert(&mut self, key: K, value: V) -> Option { + // Check if key already exists + if let Some(entry) = self.map.get_mut(&key) { + let old_value = std::mem::replace(&mut entry.value, value); + self.sieve.mark_visited(entry.sieve_index); + return Some(old_value); + } + + // Evict if at capacity + if self.sieve.is_full() { + self.evict_one(); + } + + // Insert new entry - store a clone of the key in the sieve for O(1) eviction + let sieve_index = self.sieve.insert(key.clone()).expect("should have space after eviction"); + + self.map.insert(key, CacheEntry { value, sieve_index }); + + None + } + + /// Removes a key from the shard. + fn remove(&mut self, key: &K) -> Option { + let entry = self.map.remove(key)?; + self.sieve.remove(entry.sieve_index); + Some(entry.value) + } + + /// Evicts one entry using the SIEVE algorithm. + /// + /// This is now O(1) because the sieve stores the key directly, + /// avoiding the need to iterate through the map. + fn evict_one(&mut self) { + if let Some(evicted_key) = self.sieve.evict() { + // Direct O(1) removal using the evicted key + self.map.remove(&evicted_key); + } + } + + /// Clears all entries. + fn clear(&mut self) { + self.map.clear(); + self.sieve = SieveList::new(self.sieve.capacity()); + } +} + +/// A cache entry storing the value and its SIEVE metadata index. +struct CacheEntry { + /// The cached value. + value: V, + /// Index into the SIEVE list. + sieve_index: NodeIndex, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_shard_basic_operations() { + let shard: NumaShard = NumaShard::new(10); + + assert!(shard.is_empty()); + assert_eq!(shard.capacity(), 10); + + // Insert + assert!(shard.insert("key1".to_string(), 100).is_none()); + assert!(shard.insert("key2".to_string(), 200).is_none()); + assert_eq!(shard.len(), 2); + + // Get + assert_eq!(shard.get(&"key1".to_string()), Some(100)); + assert_eq!(shard.get(&"key2".to_string()), Some(200)); + assert_eq!(shard.get(&"key3".to_string()), None); + + // Update + assert_eq!(shard.insert("key1".to_string(), 150), Some(100)); + assert_eq!(shard.get(&"key1".to_string()), Some(150)); + + // Remove + assert_eq!(shard.remove(&"key1".to_string()), Some(150)); + assert_eq!(shard.get(&"key1".to_string()), None); + assert_eq!(shard.len(), 1); + + // Clear + shard.clear(); + assert!(shard.is_empty()); + } + + #[test] + fn test_shard_eviction() { + let shard: NumaShard = NumaShard::new(3); + + // Fill the shard + shard.insert(1, 100); + shard.insert(2, 200); + shard.insert(3, 300); + assert_eq!(shard.len(), 3); + + // Access some entries to mark them as visited + let _ = shard.get(&1); + let _ = shard.get(&2); + + // Insert a new entry, triggering eviction + shard.insert(4, 400); + assert_eq!(shard.len(), 3); + + // The entry that was not accessed (3) should have been evicted + // Note: SIEVE eviction order may vary, so we just check the count + let count = [1, 2, 3, 4].iter().filter(|k| shard.get(k).is_some()).count(); + assert_eq!(count, 3); + } + + #[test] + fn test_shard_alignment() { + // Verify that NumaShard is properly aligned + assert!(std::mem::align_of::>() >= 64); + } +} diff --git a/crates/thread_aware_cache/src/sieve.rs b/crates/thread_aware_cache/src/sieve.rs new file mode 100644 index 00000000..c248216a --- /dev/null +++ b/crates/thread_aware_cache/src/sieve.rs @@ -0,0 +1,429 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! SIEVE eviction data structures. +//! +//! This module contains the metadata nodes and list management for the SIEVE eviction policy. +//! We decouple value storage from eviction metadata to improve cache locality during eviction scans. + +// These truncations are intentional - NodeIndex is u32, which is sufficient for cache capacities +#![expect( + clippy::cast_possible_truncation, + reason = "NodeIndex is u32, which is sufficient for expected cache capacities" +)] + +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Index type for SIEVE nodes to reduce memory footprint. +pub type NodeIndex = u32; + +/// Sentinel value indicating no node (null pointer equivalent). +pub const NULL_INDEX: NodeIndex = NodeIndex::MAX; + +/// Metadata node for SIEVE eviction tracking. +/// +/// Each entry in the cache has a corresponding `SieveNode` that tracks eviction metadata. +/// Indices are `u32` to reduce memory footprint compared to `usize`. +/// +/// The node stores a clone of the key to enable O(1) lookup during eviction, avoiding +/// the need to iterate through the entire map to find the key by hash. +#[derive(Debug)] +pub struct SieveNode { + /// A clone of the key, stored for O(1) map removal during eviction. + pub key: Option, + + /// The "Second Chance" bit. + /// - Set to `true` on Access. + /// - Checked/Cleared during Eviction. + /// + /// Using `AtomicBool` allows setting this with `Relaxed` ordering during reads, + /// avoiding the need to upgrade to a write lock on cache hits. + pub visited: AtomicBool, + + /// Index of the next node in the doubly-linked list. + pub next: NodeIndex, + + /// Index of the previous node in the doubly-linked list. + pub prev: NodeIndex, + + /// Indicates whether this node slot is currently in use. + pub in_use: bool, +} + +impl SieveNode { + /// Creates a new uninitialized node. + #[must_use] + pub const fn new() -> Self { + Self { + key: None, + visited: AtomicBool::new(false), + next: NULL_INDEX, + prev: NULL_INDEX, + in_use: false, + } + } + + /// Initializes this node with the given key. + pub fn init(&mut self, key: K) { + self.key = Some(key); + self.visited.store(false, Ordering::Relaxed); + self.next = NULL_INDEX; + self.prev = NULL_INDEX; + self.in_use = true; + } + + /// Clears this node, marking it as unused. + pub fn clear(&mut self) { + self.key = None; + self.in_use = false; + self.next = NULL_INDEX; + self.prev = NULL_INDEX; + } + + /// Marks this node as visited (accessed). + pub fn mark_visited(&self) { + self.visited.store(true, Ordering::Relaxed); + } + + /// Checks if this node was visited and clears the flag. + /// + /// Returns `true` if the node was visited. + pub fn check_and_clear_visited(&self) -> bool { + self.visited.swap(false, Ordering::Relaxed) + } +} + +impl Default for SieveNode { + fn default() -> Self { + Self::new() + } +} + +/// Manages the SIEVE eviction state for a shard. +#[derive(Debug)] +pub struct SieveList { + /// Pre-allocated slab of metadata nodes. + nodes: Vec>, + + /// The "Clock Hand" cursor for eviction. + hand: Option, + + /// Head of the linked list (most recently inserted). + head: Option, + + /// Tail of the linked list (oldest). + tail: Option, + + /// Free list head for recycling node slots. + free_head: Option, + + /// Current number of entries in the list. + len: usize, + + /// Maximum capacity. + capacity: usize, +} + +impl SieveList { + /// Creates a new SIEVE list with the given capacity. + #[must_use] + pub fn new(capacity: usize) -> Self { + let mut nodes = Vec::with_capacity(capacity); + nodes.resize_with(capacity, SieveNode::new); + + // Initialize free list by chaining all nodes together + for (i, node) in nodes.iter_mut().enumerate().take(capacity) { + if let Some(next) = (i + 1 < capacity).then(|| (i + 1) as NodeIndex) { + node.next = next; + } + } + + let free_head = (capacity > 0).then_some(0); + + Self { + nodes, + hand: None, + head: None, + tail: None, + free_head, + len: 0, + capacity, + } + } + + /// Returns the current number of entries. + /// + /// This method is primarily useful for testing and debugging. + #[cfg(test)] + #[must_use] + pub const fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the list is empty. + #[must_use] + pub const fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Returns `true` if the list is at capacity. + #[must_use] + pub const fn is_full(&self) -> bool { + self.len >= self.capacity + } + + /// Returns the capacity. + #[must_use] + pub const fn capacity(&self) -> usize { + self.capacity + } + + /// Allocates a new node slot from the free list. + /// + /// Returns `None` if no slots are available. + fn alloc_node(&mut self) -> Option { + let idx = self.free_head?; + let next_free = self.nodes[idx as usize].next; + self.free_head = if next_free == NULL_INDEX { None } else { Some(next_free) }; + Some(idx) + } + + /// Returns a node slot to the free list. + fn free_node(&mut self, idx: NodeIndex) { + self.nodes[idx as usize].clear(); + self.nodes[idx as usize].next = self.free_head.unwrap_or(NULL_INDEX); + self.free_head = Some(idx); + } + + /// Inserts a new entry at the head of the list. + /// + /// Returns the node index if successful, or `None` if the list is full. + pub fn insert(&mut self, key: K) -> Option { + let idx = self.alloc_node()?; + self.nodes[idx as usize].init(key); + + // Insert at head + if let Some(old_head) = self.head { + self.nodes[idx as usize].next = old_head; + self.nodes[old_head as usize].prev = idx; + } + self.head = Some(idx); + + if self.tail.is_none() { + self.tail = Some(idx); + } + + // Initialize hand if this is the first entry + if self.hand.is_none() { + self.hand = Some(idx); + } + + self.len += 1; + Some(idx) + } + + /// Removes a node from the list by index. + pub fn remove(&mut self, idx: NodeIndex) { + let node = &self.nodes[idx as usize]; + if !node.in_use { + return; + } + + let prev = node.prev; + let next = node.next; + + // Update neighbors + if prev == NULL_INDEX { + // This was the head + self.head = if next == NULL_INDEX { None } else { Some(next) }; + } else { + self.nodes[prev as usize].next = next; + } + + if next == NULL_INDEX { + // This was the tail + self.tail = if prev == NULL_INDEX { None } else { Some(prev) }; + } else { + self.nodes[next as usize].prev = prev; + } + + // Update hand if it pointed to this node + if self.hand == Some(idx) { + self.hand = if prev != NULL_INDEX { + Some(prev) + } else if next != NULL_INDEX { + Some(next) + } else { + None + }; + } + + self.free_node(idx); + self.len -= 1; + } + + /// Marks a node as visited. + pub fn mark_visited(&self, idx: NodeIndex) { + if (idx as usize) < self.nodes.len() { + self.nodes[idx as usize].mark_visited(); + } + } + + /// Finds and removes a victim for eviction using the SIEVE algorithm. + /// + /// Returns the key of the evicted entry for O(1) map removal. + pub fn evict(&mut self) -> Option { + if self.is_empty() { + return None; + } + + let start = self.hand?; + let mut cursor = start; + let mut iterations = 0; + let max_iterations = self.len * 2; // Safety limit to prevent infinite loops + + loop { + if iterations >= max_iterations { + // Fallback: just evict the current cursor position + break; + } + iterations += 1; + + let node = &self.nodes[cursor as usize]; + if !node.in_use { + // Skip unused nodes (shouldn't happen normally) + cursor = if node.prev == NULL_INDEX { + self.tail.unwrap_or(start) + } else { + node.prev + }; + continue; + } + + if node.check_and_clear_visited() { + // Node was visited, give it a second chance + cursor = if node.prev == NULL_INDEX { + // Wrap around to tail + self.tail.unwrap_or(cursor) + } else { + node.prev + }; + } else { + // Found a victim + break; + } + } + + // Take the key from the node before removing + let key = self.nodes[cursor as usize].key.take(); + + // Move hand to the previous position before removing + let prev = self.nodes[cursor as usize].prev; + self.hand = if prev == NULL_INDEX { self.tail } else { Some(prev) }; + + self.remove(cursor); + + key + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sieve_node_lifecycle() { + let mut node: SieveNode = SieveNode::new(); + assert!(!node.in_use); + + node.init(12345); + assert!(node.in_use); + assert_eq!(node.key, Some(12345)); + assert!(!node.visited.load(Ordering::Relaxed)); + + node.mark_visited(); + assert!(node.visited.load(Ordering::Relaxed)); + + assert!(node.check_and_clear_visited()); + assert!(!node.visited.load(Ordering::Relaxed)); + + node.clear(); + assert!(!node.in_use); + assert!(node.key.is_none()); + } + + #[test] + fn test_sieve_list_basic() { + let mut list: SieveList = SieveList::new(10); + assert!(list.is_empty()); + assert!(!list.is_full()); + assert_eq!(list.capacity(), 10); + + // Insert some entries + let idx1 = list.insert(100).expect("should insert"); + let idx2 = list.insert(200).expect("should insert"); + let idx3 = list.insert(300).expect("should insert"); + + assert_eq!(list.len(), 3); + assert!(!list.is_empty()); + + // Verify keys are stored + assert_eq!(list.nodes[idx1 as usize].key, Some(100)); + assert_eq!(list.nodes[idx2 as usize].key, Some(200)); + assert_eq!(list.nodes[idx3 as usize].key, Some(300)); + + // Remove middle entry + list.remove(idx2); + assert_eq!(list.len(), 2); + } + + #[test] + fn test_sieve_list_full() { + let mut list: SieveList = SieveList::new(3); + + list.insert(1).expect("should insert"); + list.insert(2).expect("should insert"); + list.insert(3).expect("should insert"); + + assert!(list.is_full()); + assert!(list.insert(4).is_none()); + } + + #[test] + fn test_sieve_eviction() { + let mut list: SieveList = SieveList::new(3); + + let idx1 = list.insert(100).expect("should insert"); + let idx2 = list.insert(200).expect("should insert"); + let _idx3 = list.insert(300).expect("should insert"); + + // Mark idx1 and idx2 as visited + list.mark_visited(idx1); + list.mark_visited(idx2); + + // Evict should find idx3 (the unvisited one) + let key = list.evict().expect("should evict"); + assert_eq!(key, 300); + assert_eq!(list.len(), 2); + } + + #[test] + fn test_sieve_eviction_second_chance() { + let mut list: SieveList = SieveList::new(3); + + let idx1 = list.insert(100).expect("should insert"); + let idx2 = list.insert(200).expect("should insert"); + let idx3 = list.insert(300).expect("should insert"); + + // Mark all as visited + list.mark_visited(idx1); + list.mark_visited(idx2); + list.mark_visited(idx3); + + // First eviction should clear visited flags and eventually evict one + let key = list.evict().expect("should evict"); + // One of them should be evicted after clearing visited flags + assert!(key == 100 || key == 200 || key == 300); + assert_eq!(list.len(), 2); + } +} diff --git a/crates/thread_aware_cache/src/tests.rs b/crates/thread_aware_cache/src/tests.rs new file mode 100644 index 00000000..f00c5594 --- /dev/null +++ b/crates/thread_aware_cache/src/tests.rs @@ -0,0 +1,487 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Integration tests for the NUMA cache. + +use crate::NumaCache; +use thread_aware::create_manual_pinned_affinities; + +#[test] +fn test_eviction_under_pressure() { + let affinities = create_manual_pinned_affinities(&[1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(5) + .build(); + let affinity = affinities[0]; + + // Fill the cache + for i in 0..5 { + cache.insert(affinity, i, i * 10); + } + assert_eq!(cache.len(), 5); + + // Access some entries to mark them as visited + let _ = cache.get(affinity, &0); + let _ = cache.get(affinity, &1); + + // Insert more entries, triggering evictions + for i in 5..10 { + cache.insert(affinity, i, i * 10); + } + + // Should still have exactly 5 entries + assert_eq!(cache.len(), 5); + + // The most recently inserted and accessed entries should be present + // SIEVE should have evicted unvisited entries first + let mut found = 0; + for i in 0..10 { + if cache.get(affinity, &i).is_some() { + found += 1; + } + } + assert_eq!(found, 5); +} + +#[test] +fn test_multi_shard_distribution() { + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + // Insert entries across all affinities + for (i, &affinity) in affinities.iter().enumerate() { + let base = i32::try_from(i).expect("index fits in i32") * 100; + for j in 0..25 { + let key = base + j; + cache.insert(affinity, key, key); + } + } + + // All entries should be retrievable from their respective affinities + for (i, &affinity) in affinities.iter().enumerate() { + let base = i32::try_from(i).expect("index fits in i32") * 100; + for j in 0..25 { + let key = base + j; + assert_eq!(cache.get(affinity, &key), Some(key), "key {key} should be present"); + } + } + + // Check that entries are distributed across shards + let mut shard_counts = [0usize; 4]; + for (i, count) in shard_counts.iter_mut().enumerate() { + *count = cache.shard(i).len(); + } + + // Each shard should have exactly 25 entries + for (i, &count) in shard_counts.iter().enumerate() { + assert_eq!(count, 25, "shard {i} should have 25 entries"); + } +} + +#[test] +fn test_concurrent_access() { + use std::sync::Arc; + use std::thread; + + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = Arc::new( + NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(1000) + .build(), + ); + let affinities = Arc::new(affinities); + + let mut handles = vec![]; + + // Spawn writer threads + for t in 0..4 { + let cache = Arc::clone(&cache); + let affinities = Arc::clone(&affinities); + handles.push(thread::spawn(move || { + let affinity = affinities[t]; + let base = i32::try_from(t).expect("index fits in i32") * 1000; + for i in 0..250 { + let key = base + i; + cache.insert(affinity, key, key); + } + })); + } + + // Spawn reader threads + for t in 0..4 { + let cache = Arc::clone(&cache); + let affinities = Arc::clone(&affinities); + handles.push(thread::spawn(move || { + let affinity = affinities[t]; + let base = i32::try_from(t).expect("index fits in i32") * 1000; + for i in 0..250 { + let key = base + i; + // May or may not find the key depending on timing + let _ = cache.get(affinity, &key); + } + })); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().expect("thread should not panic"); + } + + // Cache should have entries from all writer threads + assert!(!cache.is_empty()); +} + +#[test] +fn test_update_marks_visited() { + let affinities = create_manual_pinned_affinities(&[1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(3) + .build(); + let affinity = affinities[0]; + + // Insert 3 entries + cache.insert(affinity, 1, 10); + cache.insert(affinity, 2, 20); + cache.insert(affinity, 3, 30); + + // Update entry 1 (should mark it as visited) + cache.insert(affinity, 1, 15); + + // Insert a new entry, triggering eviction + cache.insert(affinity, 4, 40); + + // Entry 1 should still exist (it was visited via update) + assert_eq!(cache.get(affinity, &1), Some(15)); +} + +#[test] +fn test_get_marks_visited() { + let affinities = create_manual_pinned_affinities(&[1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(3) + .build(); + let affinity = affinities[0]; + + // Insert 3 entries + cache.insert(affinity, 1, 10); + cache.insert(affinity, 2, 20); + cache.insert(affinity, 3, 30); + + // Access entry 2 (should mark it as visited) + assert_eq!(cache.get(affinity, &2), Some(20)); + + // Insert a new entry, triggering eviction + cache.insert(affinity, 4, 40); + + // Entry 2 should still exist (it was visited via get) + assert_eq!(cache.get(affinity, &2), Some(20)); +} + +#[test] +fn test_remove_frees_slot() { + let affinities = create_manual_pinned_affinities(&[1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(3) + .build(); + let affinity = affinities[0]; + + // Fill the cache + cache.insert(affinity, 1, 10); + cache.insert(affinity, 2, 20); + cache.insert(affinity, 3, 30); + assert_eq!(cache.len(), 3); + + // Remove one entry + cache.remove(affinity, &2); + assert_eq!(cache.len(), 2); + + // Should be able to insert without eviction + cache.insert(affinity, 4, 40); + assert_eq!(cache.len(), 3); + + // All three remaining entries should be accessible + assert_eq!(cache.get(affinity, &1), Some(10)); + assert_eq!(cache.get(affinity, &3), Some(30)); + assert_eq!(cache.get(affinity, &4), Some(40)); +} + +#[test] +fn test_string_keys() { + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10) + .build(); + + cache.insert(affinities[0], "hello".to_string(), "world".to_string()); + cache.insert(affinities[1], "foo".to_string(), "bar".to_string()); + + assert_eq!(cache.get(affinities[0], &"hello".to_string()), Some("world".to_string())); + assert_eq!(cache.get(affinities[1], &"foo".to_string()), Some("bar".to_string())); +} + +#[test] +fn test_cross_shard_get_clones_locally() { + // Test that getting a key from a different shard clones it locally + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10) + .build(); + + // Insert on shard 0 + cache.insert(affinities[0], "key".to_string(), 42); + assert_eq!(cache.shard(0).len(), 1); + assert_eq!(cache.shard(1).len(), 0); + + // Get from shard 1 - should find in shard 0 and clone to shard 1 + assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); + + // Now both shards should have the key + assert_eq!(cache.shard(0).len(), 1); + assert_eq!(cache.shard(1).len(), 1); + + // Both shards should return the value + assert_eq!(cache.get(affinities[0], &"key".to_string()), Some(42)); + assert_eq!(cache.get(affinities[1], &"key".to_string()), Some(42)); +} + +#[test] +fn test_cross_shard_get_local_first() { + // Test that local shard is checked first (fast path) + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10) + .build(); + + // Insert on shard 0 + cache.insert(affinities[0], "key".to_string(), 42); + + // Get from shard 0 - should find locally, no cross-shard lookup + assert_eq!(cache.get(affinities[0], &"key".to_string()), Some(42)); + + // Shard 1 should still be empty (no unnecessary cloning) + assert_eq!(cache.shard(1).len(), 0); +} + +#[test] +fn test_cross_shard_remove_clears_all() { + // Test that remove clears from all shards + let affinities = create_manual_pinned_affinities(&[1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(10) + .build(); + + // Insert on shard 0 + cache.insert(affinities[0], "key".to_string(), 42); + + // Get from shard 1 and shard 2 - clones to both + let _ = cache.get(affinities[1], &"key".to_string()); + let _ = cache.get(affinities[2], &"key".to_string()); + + // All three shards should have the key + assert_eq!(cache.shard(0).len(), 1); + assert_eq!(cache.shard(1).len(), 1); + assert_eq!(cache.shard(2).len(), 1); + assert_eq!(cache.len(), 3); + + // Remove from any shard - should clear from ALL shards + assert_eq!(cache.remove(affinities[1], &"key".to_string()), Some(42)); + + // All shards should be empty now + assert_eq!(cache.shard(0).len(), 0); + assert_eq!(cache.shard(1).len(), 0); + assert_eq!(cache.shard(2).len(), 0); + + // Key should not be found anywhere + assert!(cache.get(affinities[0], &"key".to_string()).is_none()); + assert!(cache.get(affinities[1], &"key".to_string()).is_none()); + assert!(cache.get(affinities[2], &"key".to_string()).is_none()); +} + +#[test] +fn test_cross_shard_eviction_local_only() { + // Test that eviction is still shard-local + let affinities = create_manual_pinned_affinities(&[1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(3) + .build(); + + // Fill shard 0 with keys 1, 2, 3 + cache.insert(affinities[0], 1, 10); + cache.insert(affinities[0], 2, 20); + cache.insert(affinities[0], 3, 30); + + // Clone key 1 to shard 1 + let _ = cache.get(affinities[1], &1); + assert_eq!(cache.shard(1).len(), 1); + + // Fill shard 1 by adding more keys (causes eviction on shard 1 only) + cache.insert(affinities[1], 100, 1000); + cache.insert(affinities[1], 101, 1010); + // This insert may evict key 1 from shard 1 if it wasn't visited + cache.insert(affinities[1], 102, 1020); + + // Shard 0 should still have all 3 original keys + assert_eq!(cache.shard(0).len(), 3); + assert_eq!(cache.get(affinities[0], &1), Some(10)); + assert_eq!(cache.get(affinities[0], &2), Some(20)); + assert_eq!(cache.get(affinities[0], &3), Some(30)); +} + +#[test] +fn test_cross_shard_multiple_affinities() { + // Test cross-shard behavior with many affinities + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + // Insert data on each shard + for (i, &aff) in affinities.iter().enumerate() { + let idx = i32::try_from(i).expect("index fits in i32"); + cache.insert(aff, format!("key_{i}"), idx); + } + + // Each shard should have 1 entry + for i in 0..4 { + assert_eq!(cache.shard(i).len(), 1); + } + + // Access all keys from affinity 0 - should clone them all locally + for i in 0..4i32 { + assert_eq!(cache.get(affinities[0], &format!("key_{i}")), Some(i)); + } + + // Shard 0 should now have all 4 keys + assert_eq!(cache.shard(0).len(), 4); + + // Other shards still have their original key + for i in 1..4 { + assert_eq!(cache.shard(i).len(), 1); + } +} + +#[test] +fn test_cross_shard_concurrent_access() { + use std::sync::Arc; + use std::thread; + + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = Arc::new( + NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(1000) + .build(), + ); + let affinities = Arc::new(affinities); + + // Insert some data on shard 0 + for i in 0..100 { + cache.insert(affinities[0], i, i * 10); + } + + let mut handles = vec![]; + + // Spawn reader threads on all affinities trying to access the same keys + for t in 0..4 { + let cache = Arc::clone(&cache); + let affinities = Arc::clone(&affinities); + handles.push(thread::spawn(move || { + let affinity = affinities[t]; + for i in 0..100 { + // All threads try to read the same keys, triggering cross-shard cloning + let result = cache.get(affinity, &i); + assert_eq!(result, Some(i * 10), "key {i} should be found from affinity {t}"); + } + })); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().expect("thread should not panic"); + } + + // All keys should be accessible from any affinity + for &aff in affinities.iter() { + for i in 0..100 { + assert_eq!(cache.get(aff, &i), Some(i * 10)); + } + } +} + +#[test] +fn test_bloom_filter_optimization() { + // Test that the Bloom filter correctly identifies keys that don't exist + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(100) + .build(); + + // Insert some keys on shard 0 + for i in 0..50 { + cache.insert(affinities[0], i, i * 10); + } + + // Keys that exist should be found (Bloom filter returns positive) + for i in 0..50 { + assert_eq!(cache.get(affinities[1], &i), Some(i * 10), "key {i} should be found"); + } + + // Keys that don't exist should return None efficiently + // (Bloom filter should return negative, avoiding cross-shard search) + for i in 1000..1050 { + assert!(cache.get(affinities[0], &i).is_none(), "key {i} should not exist"); + assert!(cache.get(affinities[1], &i).is_none(), "key {i} should not exist"); + } +} + +#[test] +fn test_bloom_filter_no_false_negatives_in_cache() { + // Verify the Bloom filter never causes false negatives + let affinities = create_manual_pinned_affinities(&[1, 1, 1, 1]); + let cache = NumaCache::::builder() + .affinities(&affinities) + .capacity_per_shard(1000) + .build(); + + // Insert keys across different shards + for (idx, &aff) in affinities.iter().enumerate() { + let base = i32::try_from(idx).expect("index fits in i32") * 1000; + for i in 0..100 { + cache.insert(aff, base + i, i); + } + } + + // All inserted keys MUST be found from any affinity + // (tests that Bloom filter never produces false negatives) + for (idx, _) in affinities.iter().enumerate() { + let base = i32::try_from(idx).expect("index fits in i32") * 1000; + for (check_idx, &check_aff) in affinities.iter().enumerate() { + for i in 0..100 { + let result = cache.get(check_aff, &(base + i)); + assert_eq!( + result, + Some(i), + "key {} should be found from affinity {} (inserted on affinity {})", + base + i, + check_idx, + idx + ); + } + } + } +}