From f2fea6415020415c3c558b57af6dd0431604c826 Mon Sep 17 00:00:00 2001 From: SF-Zhou Date: Fri, 31 Oct 2025 17:28:37 +0800 Subject: [PATCH 1/2] support merging multiple MassMap shards into one file --- README.md | 4 +- examples/massmap.rs | 47 +++++- src/builder.rs | 348 +++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 2 +- src/massmap.rs | 4 +- src/meta.rs | 11 +- 6 files changed, 397 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 5fd331c..5605ce4 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ massmap convert -i examples/demo.json -o examples/demo.massmap --bucket-count 32 #> "file_length": 656, #> "entry_count": 47, #> "bucket_count": 32, -#> "empty_buckets": 5, +#> "occupied_bucket_count": 27, #> "meta_offset": 486, #> "meta_length": 170, #> "hash_seed": 0 @@ -80,7 +80,7 @@ massmap info examples/demo.massmap -k 1999 #> "file_length": 656, #> "entry_count": 47, #> "bucket_count": 32, -#> "empty_buckets": 5, +#> "occupied_bucket_count": 27, #> "meta_offset": 486, #> "meta_length": 170, #> "hash_seed": 0 diff --git a/examples/massmap.rs b/examples/massmap.rs index 8d9c055..d55a88f 100644 --- a/examples/massmap.rs +++ b/examples/massmap.rs @@ -2,6 +2,7 @@ use clap::{Parser, Subcommand}; use foldhash::fast::FixedState; use massmap::{ MassMap, MassMapBuilder, MassMapDefaultHashLoader, MassMapHashConfig, MassMapHashLoader, + MassMapMerger, }; use serde_json::Value; use std::fs::File; @@ -13,6 +14,7 @@ fn main() -> Result<()> { match cli.command { Command::Info(args) => run_info(args), Command::Convert(args) => run_convert(args), + Command::Merge(args) => run_merge(args), } } @@ -35,6 +37,8 @@ enum Command { Info(InfoArgs), /// Convert a JSON key-value file into a massmap binary file Convert(ConvertArgs), + /// Merge multiple massmap binary files into a single massmap binary file + Merge(MergeArgs), } #[derive(clap::Args)] @@ -62,15 +66,30 @@ struct ConvertArgs { #[arg(short, long, value_name = "FILE")] output: PathBuf, - /// Optional override for the hash seed + /// Seed value for the hash function #[arg(long, value_name = "SEED", default_value_t = 0)] hash_seed: u64, - /// Optional override for bucket count + /// Number of buckets in the massmap #[arg(long, value_name = "COUNT", default_value_t = 1 << 16)] bucket_count: u64, - /// Optional override for writer buffer size in bytes + /// Buffer size in bytes for writing the massmap file + #[arg(long, value_name = "BYTES", default_value_t = 16 << 20)] + buffer_size: usize, +} + +#[derive(clap::Args)] +struct MergeArgs { + /// Path to the source JSON file containing key-value pairs + #[arg(short, long, value_name = "FILE")] + input: Vec, + + /// Path to the massmap binary file to produce + #[arg(short, long, value_name = "FILE")] + output: PathBuf, + + /// Buffer size in bytes for writing the massmap file #[arg(long, value_name = "BYTES", default_value_t = 16 << 20)] buffer_size: usize, } @@ -207,3 +226,25 @@ fn expect_string(value: Value, index: usize) -> Result { fn invalid_json(message: String) -> Error { Error::new(ErrorKind::InvalidData, message) } + +fn run_merge(args: MergeArgs) -> Result<()> { + let maps = args + .input + .iter() + .map(|path| { + let file = File::open(path)?; + MassMap::::load(file) + }) + .collect::>>()?; + + let writer = File::create(&args.output)?; + let info = MassMapMerger::default() + .with_writer_buffer_size(args.buffer_size) + .merge(&writer, maps)?; + + let json = serde_json::to_string_pretty(&info) + .map_err(|e| Error::other(format!("Failed to format JSON: {e}")))?; + println!("{}", json); + + Ok(()) +} diff --git a/src/builder.rs b/src/builder.rs index 38b57e7..6a5822f 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,10 +1,15 @@ use std::io::{Error, ErrorKind, Result, Write}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::{hash::BuildHasher, io::BufWriter}; +use std::{ + hash::{BuildHasher, Hash}, + io::BufWriter, +}; + +use serde::Deserialize; use crate::{ - MassMapBucketMeta, MassMapDefaultHashLoader, MassMapHashConfig, MassMapHashLoader, - MassMapHeader, MassMapInfo, MassMapMeta, MassMapWriter, + MassMap, MassMapBucketMeta, MassMapDefaultHashLoader, MassMapHashConfig, MassMapHashLoader, + MassMapHeader, MassMapInfo, MassMapMeta, MassMapReader, MassMapWriter, }; /// Builder type for emitting massmap files from key-value iterators. @@ -27,7 +32,7 @@ pub struct MassMapBuilder { phantom: std::marker::PhantomData, } -impl Default for MassMapBuilder { +impl Default for MassMapBuilder { fn default() -> Self { Self { hash_config: MassMapHashConfig::default(), @@ -40,6 +45,14 @@ impl Default for MassMapBuilder { } } +impl MassMapBuilder { + /// Creates a new default massmap builder with default hash loader. + #[allow(clippy::should_implement_trait)] + pub fn default() -> Self { + ::default() + } +} + impl MassMapBuilder { /// Replaces the entire hash configuration used to distribute keys across buckets. /// @@ -140,11 +153,10 @@ impl MassMapBuilder { entry_count += 1; } - const S: usize = std::mem::size_of::(); let mut bucket_metas: Vec = Vec::with_capacity(self.bucket_count as usize); - let offset = AtomicU64::new(S as u64 * 3); + let offset = AtomicU64::new(MassMapHeader::SIZE as u64); let mut buf_writer = BufWriter::with_capacity( self.writer_buffer_size, MassMapWriterWrapper { @@ -152,7 +164,9 @@ impl MassMapBuilder { offset: &offset, }, ); - for bucket in buckets { + let mut occupied_bucket_count = 0; + let mut occupied_bucket_range = 0..0; + for (i, bucket) in buckets.into_iter().enumerate() { if bucket.is_empty() { bucket_metas.push(MassMapBucketMeta { offset: 0, @@ -162,6 +176,12 @@ impl MassMapBuilder { continue; } + occupied_bucket_count += 1; + if occupied_bucket_range.is_empty() { + occupied_bucket_range.start = i as u64; + } + occupied_bucket_range.end = i as u64 + 1; + let begin_offset = offset.load(Ordering::Relaxed) + buf_writer.buffer().len() as u64; let result = if self.field_names { @@ -190,7 +210,8 @@ impl MassMapBuilder { hash_config: self.hash_config, entry_count, bucket_count: self.bucket_count, - empty_buckets: bucket_metas.iter().filter(|b| b.count == 0).count() as u64, + occupied_bucket_count, + occupied_bucket_range, }; let meta_offset = offset.load(Ordering::Relaxed) + buf_writer.buffer().len() as u64; @@ -232,8 +253,150 @@ impl<'a, W: MassMapWriter> std::io::Write for MassMapWriterWrapper<'a, W> { } } +#[derive(Debug)] +pub struct MassMapMerger { + writer_buffer_size: usize, +} + +impl Default for MassMapMerger { + fn default() -> Self { + Self { + writer_buffer_size: 16 << 20, // 16 MiB + } + } +} + +impl MassMapMerger { + /// Adjusts the capacity of the buffered writer used while streaming data. + pub fn with_writer_buffer_size(mut self, size: usize) -> Self { + self.writer_buffer_size = size; + self + } +} + +impl MassMapMerger { + pub fn merge( + self, + writer: &W, + mut maps: Vec>, + ) -> Result + where + W: MassMapWriter, + K: for<'de> Deserialize<'de> + Eq + Hash, + V: for<'de> Deserialize<'de> + Clone, + { + if maps.is_empty() { + return Err(Error::new( + ErrorKind::InvalidData, + "No massmaps provided for merging", + )); + } + + maps.sort_by_key(|m| m.meta.occupied_bucket_range.start); + + let mut entry_count = 0; + let mut bucket_metas = + vec![MassMapBucketMeta::default(); maps[0].meta.bucket_count as usize]; + let hash_config = maps[0].meta.hash_config.clone(); + let mut occupied_bucket_count = 0; + let mut occupied_bucket_range = 0..0; + let mut global_offset = 0u64; + + for map in &maps { + if map.meta.hash_config != hash_config { + return Err(Error::new( + ErrorKind::InvalidData, + "Incompatible hash configurations between massmaps", + )); + } + if map.meta.bucket_count != bucket_metas.len() as u64 { + return Err(Error::new( + ErrorKind::InvalidData, + "Incompatible bucket counts between massmaps", + )); + } + + if map.meta.entry_count == 0 { + continue; + } + + occupied_bucket_count += map.meta.occupied_bucket_count; + if occupied_bucket_range.is_empty() { + occupied_bucket_range = map.meta.occupied_bucket_range.clone(); + } else if occupied_bucket_range.end <= map.meta.occupied_bucket_range.start { + occupied_bucket_range.end = map.meta.occupied_bucket_range.end; + } else { + return Err(Error::new( + ErrorKind::InvalidData, + "Overlapping occupied bucket ranges between massmaps", + )); + } + + // update bucket metas. + for idx in map.meta.occupied_bucket_range.clone() { + let bucket_meta = &mut bucket_metas[idx as usize]; + *bucket_meta = map.bucket_metas[idx as usize]; + if bucket_meta.count > 0 { + bucket_meta.offset += global_offset; + } + } + entry_count += map.meta.entry_count; + + // copy buckets from reader to writer directly. + let mut current_offset = MassMapHeader::SIZE as u64; + let finished_offset = map.header.meta_offset; + while current_offset < finished_offset { + let chunk = std::cmp::min( + finished_offset - current_offset, + self.writer_buffer_size as u64, + ); + map.reader.read_exact_at(current_offset, chunk, |data| { + writer.write_all_at(data, global_offset + MassMapHeader::SIZE as u64)?; + Ok(()) + })?; + current_offset += chunk; + global_offset += chunk; + } + } + + let meta = MassMapMeta { + hash_config, + entry_count, + bucket_count: bucket_metas.len() as u64, + occupied_bucket_count, + occupied_bucket_range, + }; + + let meta_offset = global_offset + MassMapHeader::SIZE as u64; + let offset = AtomicU64::new(meta_offset); + let mut buf_writer = BufWriter::with_capacity( + self.writer_buffer_size, + MassMapWriterWrapper { + inner: writer, + offset: &offset, + }, + ); + + rmp_serde::encode::write(&mut buf_writer, &(meta.clone(), bucket_metas)) + .map_err(|e| Error::other(format!("Fail to serialize meta: {}", e)))?; + buf_writer.flush()?; + let finished_offset = offset.load(Ordering::Relaxed); + + let meta_length = finished_offset - meta_offset; + let header = MassMapHeader { + meta_offset, + meta_length, + }; + writer.write_all_at(&header.serialize(), 0)?; + + Ok(MassMapInfo { header, meta }) + } +} + #[cfg(test)] mod tests { + use std::{fs::File, hash::Hasher, sync::Arc}; + use crate::*; #[derive(Debug)] @@ -338,4 +501,173 @@ mod tests { let builder = MassMapBuilder::default().with_bucket_count(1); builder.build(&writer, entries).unwrap_err(); } + + pub struct SimpleHasher { + state: u64, + modulo: u64, + } + + impl SimpleHasher { + pub fn new(modulo: u64) -> Self { + SimpleHasher { state: 0, modulo } + } + } + + impl Hasher for SimpleHasher { + fn finish(&self) -> u64 { + self.state % self.modulo + } + + fn write(&mut self, bytes: &[u8]) { + for &byte in bytes.iter().rev() { + self.state = self.state.wrapping_mul(256).wrapping_add(byte as u64); + } + } + } + + struct SimpleBuildHasher { + modulo: u64, + } + + impl std::hash::BuildHasher for SimpleBuildHasher { + type Hasher = SimpleHasher; + + fn build_hasher(&self) -> Self::Hasher { + SimpleHasher::new(self.modulo) + } + } + + struct SimpleHashLoader; + + impl MassMapHashLoader for SimpleHashLoader { + type BuildHasher = SimpleBuildHasher; + + fn load(config: &MassMapHashConfig) -> std::io::Result { + let modulo = config + .parameters + .get("modulo") + .and_then(|v| v.as_u64()) + .unwrap_or(10000); + Ok(SimpleBuildHasher { modulo }) + } + } + + fn create_simple_map( + entries: impl Iterator, + bucket_count: u64, + hash_modulo: u64, + ) -> MassMap { + let writer = MemoryWriter::new(10 << 20); // 10 MiB + let hash_config = MassMapHashConfig { + name: "simplehash".to_string(), + parameters: serde_json::json!({ + "module": hash_modulo + }), + }; + let builder = MassMapBuilder::::default() + .with_bucket_count(bucket_count) + .with_hash_config(hash_config); + builder.build(&writer, entries).unwrap(); + + MassMap::::load(writer).unwrap() + } + + #[test] + fn test_normal_merge() { + let dir = tempfile::tempdir().unwrap(); + + const M: u64 = 10000; + const N: u64 = 100_000; + const P: u64 = 10; + + let mut threads = Vec::with_capacity(P as usize); + for i in 0..P { + threads.push(std::thread::spawn(move || { + let entries = (0..N).filter(|v| (v % M) / (M / P) == i).map(|v| (v, v)); + let map = create_simple_map(entries, M, M); + assert_eq!(map.meta.occupied_bucket_count, M / P); + assert_eq!(map.meta.entry_count, N / P); + assert_eq!(map.meta.occupied_bucket_range.start, (M / P) * i); + + for item in map.iter() { + let (k, v) = item.unwrap(); + assert_eq!(k, v); + } + map + })); + } + + let mut maps = threads + .into_iter() + .map(|t| t.join().unwrap()) + .collect::>(); + maps.push(create_simple_map((0..0).map(|v| (v, v)), M, M)); + + let path = dir.path().join("merge.massmap"); + let writer = std::fs::File::create(&path).unwrap(); + MassMapMerger::default().merge(&writer, maps).unwrap(); + + let reader = std::fs::File::open(&path).unwrap(); + let map = MassMap::::load(reader).unwrap(); + assert_eq!(map.len(), N); + let map = Arc::new(map); + + let mut threads = Vec::with_capacity(P as usize); + for i in 0..P { + const CHUNK: u64 = N / P; + let range = CHUNK * i..CHUNK * (i + 1); + let map = map.clone(); + threads.push(std::thread::spawn(move || { + for v in range { + assert_eq!(map.get(&v).unwrap().unwrap(), v); + } + })); + } + + for thread in threads { + thread.join().unwrap(); + } + } + + #[test] + fn test_invalid_merge() { + // 1. different hash config. + { + let map1 = create_simple_map((0..1000).map(|i| (i, i)), 1024, 10000); + let map2 = create_simple_map((1000..2000).map(|i| (i, i)), 1024, 20000); + let writer = MemoryWriter::new(10 << 20); // 10 MiB + MassMapMerger::default() + .with_writer_buffer_size(1 << 20) + .merge(&writer, vec![map1, map2]) + .unwrap_err(); + } + + // 2. different bucket count. + { + let map1 = create_simple_map((0..1000).map(|i| (i, i)), 1024, 10000); + let map2 = create_simple_map((1000..2000).map(|i| (i, i)), 2048, 10000); + let writer = MemoryWriter::new(10 << 20); // 10 MiB + MassMapMerger::default() + .merge(&writer, vec![map1, map2]) + .unwrap_err(); + } + + // 3. overlapping occupied bucket range. + { + let map1 = create_simple_map((0..1000).map(|i| (i, i)), 1024, 10000); + let map2 = create_simple_map((500..1500).map(|i| (i, i)), 1024, 10000); + let writer = MemoryWriter::new(10 << 20); // 10 MiB + MassMapMerger::default() + .merge(&writer, vec![map1, map2]) + .unwrap_err(); + } + + // 4. empty input. + { + let writer = MemoryWriter::new(10 << 20); // 10 MiB + MassMapMerger::default() + .merge::<_, u64, u64, File, SimpleHashLoader>(&writer, vec![]) + .unwrap_err(); + } + } } diff --git a/src/lib.rs b/src/lib.rs index 9bd8942..7b2ffc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,4 +45,4 @@ mod massmap; pub use massmap::{MassMap, MassMapIter}; mod builder; -pub use builder::MassMapBuilder; +pub use builder::{MassMapBuilder, MassMapMerger}; diff --git a/src/massmap.rs b/src/massmap.rs index 110e982..ab0ab65 100644 --- a/src/massmap.rs +++ b/src/massmap.rs @@ -28,11 +28,11 @@ pub struct MassMap, + pub(crate) bucket_metas: Vec, /// Hash state initialized with the stored seed. build_hasher: H::BuildHasher, /// Reader used to access the backing storage. - reader: R, + pub(crate) reader: R, /// Phantom data to associate key and value types. phantom_data: PhantomData<(K, V)>, } diff --git a/src/meta.rs b/src/meta.rs index 96924dd..1acf6be 100644 --- a/src/meta.rs +++ b/src/meta.rs @@ -1,5 +1,8 @@ use serde::{Deserialize, Serialize}; -use std::io::{Error, ErrorKind, Result}; +use std::{ + io::{Error, ErrorKind, Result}, + ops::Range, +}; use crate::{MAGIC_NUMBER, MassMapHashConfig}; @@ -70,8 +73,10 @@ pub struct MassMapMeta { pub entry_count: u64, /// Number of buckets. pub bucket_count: u64, - /// Number of empty buckets. - pub empty_buckets: u64, + /// Number of non-empty buckets. + pub occupied_bucket_count: u64, + /// Range of non-empty bucket indices. + pub occupied_bucket_range: Range, /// Hash configuration used to derive the [`BuildHasher`](std::hash::BuildHasher) /// when reopening the map. pub hash_config: MassMapHashConfig, From 706182011e3bfb973f59448dabb3edcfb494d2eb Mon Sep 17 00:00:00 2001 From: SF-Zhou Date: Fri, 31 Oct 2025 17:43:20 +0800 Subject: [PATCH 2/2] Update src/builder.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/builder.rs b/src/builder.rs index 6a5822f..bc8e5c4 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -561,7 +561,7 @@ mod tests { let hash_config = MassMapHashConfig { name: "simplehash".to_string(), parameters: serde_json::json!({ - "module": hash_modulo + "modulo": hash_modulo }), }; let builder = MassMapBuilder::::default()