diff --git a/Cargo.lock b/Cargo.lock index 85b0be2bb..473c3b8ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1240,6 +1240,15 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +[[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -1683,6 +1692,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1879,8 +1900,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -2687,6 +2710,15 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.10", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2868,6 +2900,17 @@ dependencies = [ "walkdir", ] +[[package]] +name = "okaywal" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a98d1da023f22a2be795d3c3cb2efaa5f7f1772d3f3995581f95b12f0d2fad3d" +dependencies = [ + "crc32c", + "flume", + "parking_lot 0.12.1", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -4567,6 +4610,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "sparrow-journal" +version = "0.11.0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-ipc", + "arrow-schema", + "derive_more", + "error-stack", + "futures", + "itertools 0.11.0", + "okaywal", + "parking_lot 0.12.1", + "sparrow-batch", + "sparrow-testing", + "tempfile", + "tracing", +] + [[package]] name = "sparrow-kernels" version = "0.11.0" diff --git a/Cargo.toml b/Cargo.toml index 4f7e8f429..0e6f7770a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ arrow-buffer = { version = "47.0.0" } arrow-cast = { version = "47.0.0" } arrow-csv = { version = "47.0.0" } arrow-data = { version = "47.0.0" } +arrow-ipc = { version = "47.0.0" } arrow-json = { version = "47.0.0" } arrow-ord = { version = "47.0.0" } arrow-schema = { version = "47.0.0", features = ["serde"] } diff --git a/crates/sparrow-batch/Cargo.toml b/crates/sparrow-batch/Cargo.toml index 27b2b216f..e187cd6f5 100644 --- a/crates/sparrow-batch/Cargo.toml +++ b/crates/sparrow-batch/Cargo.toml @@ -10,6 +10,7 @@ Defines the main struct for wrapping RecordBatches in execution. """ [features] +default = [] testing = ["proptest"] [dependencies] diff --git a/crates/sparrow-batch/src/batch.rs b/crates/sparrow-batch/src/batch.rs index 794da98be..2a929445d 100644 --- a/crates/sparrow-batch/src/batch.rs +++ b/crates/sparrow-batch/src/batch.rs @@ -17,7 +17,7 @@ use crate::{Error, RowTime}; #[derive(Clone, PartialEq, Debug)] pub struct Batch { /// The data associated with the batch. - pub(crate) data: Option, + pub data: Option, /// An indication that the batch stream has completed up to the given time. /// Any rows in future batches on this stream must have a time strictly @@ -475,6 +475,35 @@ impl Batch { RowTime::from_timestamp_ns(up_to_time), ) } + + /// Creates a batch with the given times and key hashes, and `null` data. + #[cfg(any(test, feature = "testing"))] + pub fn null_from( + time: impl Into, + subsort: impl Into, + key_hash: impl Into, + up_to_time: i64, + ) -> Self { + use arrow_array::NullArray; + + let time: TimestampNanosecondArray = time.into(); + let subsort: UInt64Array = subsort.into(); + let key_hash: UInt64Array = key_hash.into(); + + let time: ArrayRef = Arc::new(time); + let subsort: ArrayRef = Arc::new(subsort); + let key_hash: ArrayRef = Arc::new(key_hash); + + let data = Arc::new(NullArray::new(time.len())); + + Batch::new_with_data( + data, + time, + subsort, + key_hash, + RowTime::from_timestamp_ns(up_to_time), + ) + } } #[cfg(debug_assertions)] @@ -602,13 +631,14 @@ fn validate_key_column( } #[derive(Clone, Debug)] -pub(crate) struct BatchInfo { - pub(crate) data: ArrayRef, - pub(crate) time: ArrayRef, - pub(crate) subsort: ArrayRef, - pub(crate) key_hash: ArrayRef, - min_present_time: RowTime, - max_present_time: RowTime, +#[non_exhaustive] +pub struct BatchInfo { + pub data: ArrayRef, + pub time: ArrayRef, + pub subsort: ArrayRef, + pub key_hash: ArrayRef, + pub min_present_time: RowTime, + pub max_present_time: RowTime, } impl PartialEq for BatchInfo { @@ -709,6 +739,10 @@ impl BatchInfo { pub(crate) fn key_hash(&self) -> &UInt64Array { self.key_hash.as_primitive() } + + pub fn len(&self) -> usize { + self.time.len() + } } #[cfg(any(test, feature = "testing"))] diff --git a/crates/sparrow-batch/src/row_time.rs b/crates/sparrow-batch/src/row_time.rs index 59c33ef73..e36d6edd1 100644 --- a/crates/sparrow-batch/src/row_time.rs +++ b/crates/sparrow-batch/src/row_time.rs @@ -1,4 +1,8 @@ +use std::str::FromStr; + +use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; use arrow_array::temporal_conversions::timestamp_ns_to_datetime; +use error_stack::{IntoReport, ResultExt}; /// Wrapper around the time of a row. /// @@ -26,6 +30,15 @@ impl RowTime { pub fn pred(&self) -> Self { Self(self.0 - 1) } + + pub fn bytes(&self) -> [u8; 8] { + self.0.to_be_bytes() + } + + pub fn from_bytes(bytes: [u8; 8]) -> Self { + let time = i64::from_be_bytes(bytes); + Self(time) + } } impl From for i64 { @@ -39,3 +52,26 @@ impl From for RowTime { RowTime(value) } } + +impl From for String { + fn from(val: RowTime) -> Self { + format!("{val}") + } +} + +#[derive(derive_more::Display, Debug)] +#[display(fmt = "invalid row time: {_0}")] +pub struct ParseError(String); + +impl error_stack::Context for ParseError {} + +impl FromStr for RowTime { + type Err = error_stack::Report; + + fn from_str(s: &str) -> Result { + string_to_timestamp_nanos(s) + .into_report() + .change_context_lazy(|| ParseError(s.to_string())) + .map(Self) + } +} diff --git a/crates/sparrow-journal/Cargo.toml b/crates/sparrow-journal/Cargo.toml new file mode 100644 index 000000000..60d6de968 --- /dev/null +++ b/crates/sparrow-journal/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sparrow-journal" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +publish = false +description = """ +Durable journaling of batches. +""" + +[dependencies] +arrow-array.workspace = true +arrow-buffer.workspace = true +arrow-ipc.workspace = true +arrow-schema.workspace = true +derive_more.workspace = true +error-stack.workspace = true +futures.workspace = true +itertools.workspace = true +okaywal = "0.3.0" +parking_lot.workspace = true +sparrow-batch = { path = "../sparrow-batch" } +tracing.workspace = true + +[dev-dependencies] +sparrow-batch = { path = "../sparrow-batch", features = ["testing"] } +sparrow-testing = { path = "../sparrow-testing" } +tempfile.workspace = true + +[lib] +doctest = false diff --git a/crates/sparrow-journal/src/batch_io.rs b/crates/sparrow-journal/src/batch_io.rs new file mode 100644 index 000000000..8da1e741b --- /dev/null +++ b/crates/sparrow-journal/src/batch_io.rs @@ -0,0 +1,216 @@ +use std::collections::HashMap; +use std::io::Write; + +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_buffer::Buffer; +use arrow_ipc::writer::{DictionaryTracker, EncodedData, IpcDataGenerator, IpcWriteOptions}; +use arrow_schema::SchemaRef; +use error_stack::{IntoReport, ResultExt}; +use itertools::Itertools; +use sparrow_batch::{Batch, RowTime}; + +use crate::error::Error; + +/// Write batches to a byte array. +/// +/// This uses arrow_ipc::writer::IpcDataGenerator directly. This allows it to avoid +/// copying buffers between the underlying encoded buffer and the stream being written. +pub(super) struct BatchEncoder { + /// Track the dictionary entries in the current segment. + dictionary_tracker: DictionaryTracker, + /// Generator for encoded batches. + gen: IpcDataGenerator, + write_options: IpcWriteOptions, + schema: SchemaRef, +} + +impl std::fmt::Debug for BatchEncoder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchEncoder") + .field("gen", &self.gen) + .field("write_options", &self.write_options) + .field("schema", &self.schema) + .finish_non_exhaustive() + } +} + +pub(super) struct Encoded { + up_to_time: RowTime, + dictionary_batches: Vec, + batch: Option, +} + +impl Encoded { + pub(super) fn write_to_journal( + self, + wal: &mut okaywal::WriteAheadLog, + ) -> std::io::Result { + let mut entry = wal.begin_entry()?; + entry.write_chunk(&self.up_to_time.bytes())?; + for dictionary_batch in self.dictionary_batches { + entry.write_chunk(&dictionary_batch.ipc_message)?; + entry.write_chunk(&dictionary_batch.arrow_data)?; + } + if let Some(batch) = self.batch { + entry.write_chunk(&batch.ipc_message)?; + entry.write_chunk(&batch.arrow_data)?; + } + entry.commit() + } +} + +impl BatchEncoder { + pub fn new(schema: SchemaRef) -> error_stack::Result { + Ok(Self { + dictionary_tracker: DictionaryTracker::new(false), + gen: IpcDataGenerator {}, + write_options: IpcWriteOptions::default(), + schema, + }) + } + + pub fn encode(&mut self, batch: &Batch) -> error_stack::Result { + // TODO: Split batches to fit in a certain size. + let up_to_time = batch.up_to_time; + + // Create a record batch for the given batch. + if let Some(batch) = &batch.data { + let batch = RecordBatch::try_new( + self.schema.clone(), + vec![ + batch.time.clone(), + batch.subsort.clone(), + batch.key_hash.clone(), + batch.data.clone(), + ], + ) + .into_report() + .change_context(Error::WriteBatch)?; + let (dictionary_batches, batch) = self + .gen + .encoded_batch(&batch, &mut self.dictionary_tracker, &self.write_options) + .into_report() + .change_context(Error::WriteBatch)?; + Ok(Encoded { + up_to_time, + dictionary_batches, + batch: Some(batch), + }) + } else { + Ok(Encoded { + up_to_time, + dictionary_batches: vec![], + batch: None, + }) + } + } +} + +#[derive(Debug)] +pub(super) struct BatchDecoder { + schema: SchemaRef, + + /// Optional dictionaries for each schema field. + /// + /// Dictionaries may be appended to in the streaming format. + dictionaries_by_id: HashMap, +} + +impl BatchDecoder { + pub fn new(schema: SchemaRef) -> Self { + Self { + schema, + dictionaries_by_id: HashMap::new(), + } + } + + /// Decode the batch from the given entry. + /// + /// If it was completely written, will return `Some(batch)`. If it wasn't completely + /// written, will return `None`. + pub fn decode( + &mut self, + entry: &mut okaywal::Entry<'_>, + ) -> error_stack::Result, Error> { + // Read the chunks. + let Some(chunks) = entry + .read_all_chunks() + .into_report() + .change_context(Error::ReadBatch)? + else { + return Ok(None); + }; + + let mut chunks = chunks.into_iter(); + + let up_to_time = chunks.next().expect("at least one entry"); + let up_to_time = RowTime::from_bytes(up_to_time.try_into().expect("row time")); + + let mut chunk_pairs = chunks.tuples().peekable(); + while let Some((metadata, batch)) = chunk_pairs.next() { + let message = decode_message(&metadata)?; + error_stack::ensure!( + message.bodyLength() as usize == batch.len(), + Error::ReadBatch + ); + + let buf = Buffer::from(batch); + + if chunk_pairs.peek().is_some() { + error_stack::ensure!( + message.header_type() == arrow_ipc::MessageHeader::DictionaryBatch, + Error::ReadBatch + ); + + let batch = message.header_as_dictionary_batch().unwrap(); + arrow_ipc::reader::read_dictionary( + &buf, + batch, + &self.schema, + &mut self.dictionaries_by_id, + &message.version(), + ) + .into_report() + .change_context(Error::ReadBatch)?; + } else { + error_stack::ensure!( + message.header_type() == arrow_ipc::MessageHeader::RecordBatch, + Error::ReadBatch + ); + let batch = message.header_as_record_batch().unwrap(); + let batch = arrow_ipc::reader::read_record_batch( + &buf, + batch, + self.schema.clone(), + &self.dictionaries_by_id, + None, + &message.version(), + ) + .into_report() + .change_context(Error::ReadBatch)?; + + return Ok(Some(Batch::new_with_data( + batch.column(3).clone(), + batch.column(0).clone(), + batch.column(1).clone(), + batch.column(2).clone(), + up_to_time, + ))); + } + } + + // If we get here, we must not have had any batches above. + Ok(Some(Batch::new_empty(up_to_time))) + } +} + +fn decode_message(buf: &[u8]) -> error_stack::Result, Error> { + arrow_ipc::root_as_message(buf) + .map_err(|e| { + // We'd like to conrert the flatbuffer error directly. + // But, it only implements `std::error::Error` if the `std` feature + // is enabled, which Arrow doesn't use. + error_stack::report!(Error::ReadBatch).attach_printable(e) + }) + .change_context(Error::ReadBatch) +} diff --git a/crates/sparrow-journal/src/checkpoints.rs b/crates/sparrow-journal/src/checkpoints.rs new file mode 100644 index 000000000..07c762166 --- /dev/null +++ b/crates/sparrow-journal/src/checkpoints.rs @@ -0,0 +1,254 @@ +use std::path::PathBuf; +use std::str::FromStr; + +use arrow_array::cast::AsArray; +use arrow_array::types::{Int64Type, TimestampNanosecondType}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use error_stack::{IntoReport, ResultExt}; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use sparrow_batch::{Batch, RowTime}; + +use crate::error::Error; + +/// Manages the checkpointed files. +/// +/// We could use Parquet files for check-pointing but we instead use +/// Arrow IPC files, since we believe they should be easier to read. +/// +/// TODO: Support checkpoints on S3. +#[derive(Debug)] +pub(super) struct Checkpoints { + checkpoint_dir: PathBuf, + checkpoints: Vec, + schema: SchemaRef, +} + +#[derive(Debug)] +struct CheckpointFile { + file_name: String, + min_present_time: RowTime, + max_present_time: RowTime, +} + +/// Restore metadata of checkpoints by scanning entries in the directory. +/// +/// TODO: In the future, we should store checkpoint metadata in the state DB +/// and do this recovery from that. +fn recover_checkpoints( + checkpoint_path: &std::path::Path, + schema: &SchemaRef, +) -> error_stack::Result, Error> { + let mut checkpoints = vec![]; + if checkpoint_path.exists() { + error_stack::ensure!( + checkpoint_path.is_dir(), + Error::CheckpointDirNotDir(checkpoint_path.to_path_buf()) + ); + } else { + std::fs::create_dir_all(checkpoint_path) + .into_report() + .change_context(Error::RecoveringCheckpoints)?; + } + + for file in std::fs::read_dir(checkpoint_path) + .into_report() + .change_context(Error::RecoveringCheckpoints)? + { + let file = file + .into_report() + .change_context(Error::RecoveringCheckpoints)?; + let file_type = file + .file_type() + .into_report() + .change_context(Error::RecoveringCheckpoints)?; + if !file_type.is_file() || file.path().extension() != Some("arrow".as_ref()) { + continue; + } + + let reader = std::fs::File::open(file.path()) + .into_report() + .change_context(Error::RecoveringCheckpoints)?; + let reader = arrow_ipc::reader::FileReader::try_new(reader, None) + .into_report() + .change_context(Error::RecoveringCheckpoints)?; + error_stack::ensure!(&reader.schema() == schema, Error::RecoveringCheckpoints); + + let file_name = file + .file_name() + .into_string() + .map_err(|_| Error::RecoveringCheckpoints)?; + + let min_present_time = reader + .custom_metadata() + .get("min_present_time") + .ok_or(Error::RecoveringCheckpoints)?; + let min_present_time = + RowTime::from_str(min_present_time).change_context(Error::RecoveringCheckpoints)?; + + let max_present_time = reader + .custom_metadata() + .get("max_present_time") + .ok_or(Error::RecoveringCheckpoints)?; + let max_present_time = + RowTime::from_str(max_present_time).change_context(Error::RecoveringCheckpoints)?; + + checkpoints.push(CheckpointFile { + file_name, + min_present_time, + max_present_time, + }) + } + + Ok(checkpoints) +} + +impl Checkpoints { + pub fn try_new(checkpoint_dir: PathBuf, schema: SchemaRef) -> error_stack::Result { + let checkpoints = recover_checkpoints(&checkpoint_dir, &schema)?; + tracing::info!( + "Recovered {} checkpoints from {}", + checkpoints.len(), + checkpoint_dir.display() + ); + Ok(Self { + checkpoint_dir, + checkpoints, + schema, + }) + } + + /// Create a check-point from the given batches. + /// + /// This method assumes the batches are non-overlapping and ordered by time. + pub fn checkpoint( + &mut self, + batches: Vec, + up_to_time: RowTime, + ) -> error_stack::Result<(), Error> { + let batch = Batch::concat(batches, up_to_time).change_context(Error::WriteCheckpoint)?; + tracing::info!("Checkpointing {} rows", batch.num_rows()); + + if let Some(batch) = batch.data { + let file_name = format!("checkpoint_{}.arrow", self.checkpoints.len()); + let rows = batch.len(); + + // Unfortunately, this writes the schema in each checkpoint file. + // We could use arrow_ipc primitives directly (like we do in batch_io) + // but we'd like to be able to write directly to the file, and it seems + // easier to rely on Arrow for this. Given that this is only happening + // per checkpoint, it should be OK (and maybe a useful sanity check). + + let writer = std::fs::File::create(self.checkpoint_dir.join(&file_name)) + .into_report() + .change_context(Error::WriteCheckpoint)?; + let mut writer = arrow_ipc::writer::FileWriter::try_new(writer, self.schema.as_ref()) + .into_report() + .change_context(Error::WriteCheckpoint)?; + writer.write_metadata("min_present_time", batch.min_present_time); + writer.write_metadata("max_present_time", batch.max_present_time); + let record_batch = RecordBatch::try_new( + self.schema.clone(), + vec![batch.time, batch.subsort, batch.key_hash, batch.data], + ) + .into_report() + .change_context(Error::WriteCheckpoint)?; + + writer + .write(&record_batch) + .into_report() + .change_context(Error::WriteCheckpoint)?; + + writer + .finish() + .into_report() + .change_context(Error::WriteCheckpoint)?; + + tracing::info!("Checkpointed {rows} jouranl rows to {file_name}"); + + // Record the new checkpoint file. + self.checkpoints.push(CheckpointFile { + file_name, + min_present_time: batch.min_present_time, + max_present_time: batch.max_present_time, + }); + } + + Ok(()) + } + + /// Create a merged read stream from all the checkpoint files. + /// + /// Currently, this assumes that the checkpoints are non-overlapping, which is true + /// given the assumption of no late data. + /// + /// TODO: Support projection to checkpoint reads. + pub fn read_checkpoints( + &mut self, + ) -> error_stack::Result>, Error> { + // Sort the checkpoints by min present time. + // Since we don't allow any overlap currently, this means we can just + // read the checkpoints in order. + // + // When we support overlap (and merging), this will still reflect the order + // that files need to be opened, and can be used to limit the set of + // active files at any point in time. + + self.checkpoints.sort_by_key(|c| c.min_present_time); + + // Clone the checkpoints to create a static iterator. + let checkpoints: Vec<_> = self + .checkpoints + .iter() + .map(|c| &c.file_name) + .cloned() + .collect(); + let checkpoint_dir = self.checkpoint_dir.clone(); + + Ok(futures::stream::iter(checkpoints) + .map(move |file_name| checkpoint_dir.join(file_name)) + .map(read_checkpoint_batches) + .try_flatten() + .boxed()) + } +} + +fn read_checkpoint_batches( + checkpoint_path: PathBuf, +) -> error_stack::Result>, Error> { + let reader = std::fs::File::open(checkpoint_path) + .into_report() + .change_context(Error::ReadingCheckpoint)?; + let reader = arrow_ipc::reader::FileReader::try_new(reader, None) + .into_report() + .change_context(Error::ReadingCheckpoint)?; + + Ok(futures::stream::try_unfold(reader, |mut reader| async { + // This isn't really async -- it actively reads the next batch. + if let Some(batch) = reader.next() { + let batch = batch + .into_report() + .change_context(Error::ReadingCheckpoint)?; + + let time = batch.column(0).clone(); + let subsort = batch.column(1).clone(); + let key_hash = batch.column(2).clone(); + let data = batch.column(3).clone(); + + let up_to_time = *time + .as_primitive::() + .values() + .last() + .expect("non-empty"); + + let batch = + Batch::new_with_data(data, time, subsort, key_hash, RowTime::from(up_to_time)); + + Ok(Some((batch, reader))) + } else { + Ok(None) + } + }) + .boxed()) +} diff --git a/crates/sparrow-journal/src/error.rs b/crates/sparrow-journal/src/error.rs new file mode 100644 index 000000000..40a96702a --- /dev/null +++ b/crates/sparrow-journal/src/error.rs @@ -0,0 +1,27 @@ +#[derive(derive_more::Display, Debug)] +pub enum Error { + #[display(fmt = "checkpoint path ({}) is not a directory", "_0.display()")] + CheckpointDirNotDir(std::path::PathBuf), + #[display(fmt = "error creating writer")] + CreatingWriter, + #[display(fmt = "error recovering write-ahead-log")] + Recovering, + #[display(fmt = "error recovering checkpoints")] + RecoveringCheckpoints, + #[display(fmt = "error creating entry in write-ahead-log")] + CreatingEntry, + #[display(fmt = "error writing batch")] + WriteBatch, + #[display(fmt = "error concatenating batch")] + Concatenate, + #[display(fmt = "error reading batch")] + ReadBatch, + #[display(fmt = "error writing checkpoint")] + WriteCheckpoint, + #[display(fmt = "error requesting checkpoint")] + Checkpointing, + #[display(fmt = "error reading checkpoint")] + ReadingCheckpoint, +} + +impl error_stack::Context for Error {} diff --git a/crates/sparrow-journal/src/lib.rs b/crates/sparrow-journal/src/lib.rs new file mode 100644 index 000000000..2dbe4062f --- /dev/null +++ b/crates/sparrow-journal/src/lib.rs @@ -0,0 +1,403 @@ +#![warn( + rust_2018_idioms, + nonstandard_style, + future_incompatible, + clippy::mod_module_files, + clippy::print_stdout, + clippy::print_stderr, + clippy::undocumented_unsafe_blocks +)] + +//! Journaling of Batches. + +use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use futures::stream::BoxStream; +use futures::StreamExt; +use parking_lot::Mutex; +use sparrow_batch::{Batch, RowTime}; +use std::sync::Arc; + +use error_stack::{IntoReport, ResultExt}; + +use crate::batch_io::{BatchDecoder, BatchEncoder}; +use crate::checkpoints::Checkpoints; +use crate::error::Error; + +mod batch_io; +mod checkpoints; +mod error; + +/// A journal of batches. +/// +/// This may be used by sources to ensure batches are durably stored before +/// acknowledging new data. +/// +/// The journal consists of two parts -- append only "write-ahead logs" (WALs) +/// and "checkpoints". The WALs are used to store new batches as they arrive. +/// The batches are appended in arrival time order. When a segment of the WAL +/// exceeds a certain size a new segment is started and the previous segment +/// is checkpointed. +/// +/// The journal is built on `okaywal` which automatically manages the active +/// segments and rollovers. As batches are added to the active segment, they +/// are also added to an in-memory view of the "current". This allows for a +/// quicker read of recent data. +pub struct BatchJournal { + wal: okaywal::WriteAheadLog, + batches: Arc, + encoder: BatchEncoder, +} + +impl BatchJournal { + /// Create (or recover) a journal from the given path. + pub fn recover( + directory: &std::path::Path, + data_type: DataType, + ) -> error_stack::Result { + let schema = data_type_to_schema(data_type); + + let checkpoint_dir = directory.join("checkpoints"); + let batches = Arc::new(Batches::try_new(schema.clone(), checkpoint_dir)?); + let manager = BatchCheckpointer { + batches: batches.clone(), + decoder: BatchDecoder::new(schema.clone()), + }; + let wal = okaywal::WriteAheadLog::recover(directory, manager) + .into_report() + .change_context(Error::Recovering)?; + tracing::info!( + "Recovered journal from {} with {} rows", + directory.display(), + batches.batches.lock().0.len() + ); + + let encoder = BatchEncoder::new(schema)?; + Ok(Self { + wal, + batches, + encoder, + }) + } + + /// Add a batch to the journal. + /// + /// When this returns (successfully), the batch will have been durably + /// committed to the journal. + pub fn journal(&mut self, batch: Batch) -> error_stack::Result<(), Error> { + let num_rows = batch.num_rows(); + let encoded = self.encoder.encode(&batch)?; + let entry_id = encoded + .write_to_journal(&mut self.wal) + .into_report() + .change_context(Error::WriteBatch)?; + tracing::trace!("Journaled batch with {num_rows} rows to {entry_id:?}"); + + self.batches.add_batch(batch)?; + + Ok(()) + } + + /// Return a stream over the current contents. + pub fn stream_current( + &self, + ) -> error_stack::Result>, Error> { + self.batches.stream_current() + } + + /// Return a stream over the current contents and subscribed to future additions. + pub fn stream_subscribe( + &self, + ) -> error_stack::Result>, Error> { + self.batches.stream_subscribe() + } + + #[cfg(test)] + /// Cause a checkpoint to be taken. + fn checkpoint(&self) -> error_stack::Result<(), Error> { + self.wal + .checkpoint_active() + .into_report() + .change_context(Error::Checkpointing) + } +} + +/// Convert a DataType to a SchemaRef. +fn data_type_to_schema(data: DataType) -> SchemaRef { + let mut fields = Vec::with_capacity(4); + fields.extend_from_slice(&[ + Field::new( + "_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("_subsort", DataType::UInt64, false), + Field::new("_key_hash", DataType::UInt64, false), + ]); + fields.push(Field::new("data", data, true)); + Arc::new(Schema::new(fields)) +} + +/// The current in-memory batches. +#[derive(Debug)] +struct Batches { + batches: Mutex<(Vec, RowTime)>, + checkpoints: Mutex, +} + +impl Batches { + fn try_new( + schema: SchemaRef, + checkpoint_dir: std::path::PathBuf, + ) -> error_stack::Result { + Ok(Self { + batches: Mutex::new((vec![], RowTime::ZERO)), + checkpoints: Mutex::new(Checkpoints::try_new(checkpoint_dir, schema)?), + }) + } + + fn add_batch(&self, batch: Batch) -> error_stack::Result<(), Error> { + let mut lock = self.batches.lock(); + batch.max_present_time(); + assert!( + lock.1 <= batch.up_to_time, + "Expected times to be increasing, but got {} and {}", + lock.1, + batch.up_to_time, + ); + lock.1 = batch.up_to_time; + lock.0.push(batch); + + Ok(()) + } + + fn checkpoint(&self) -> error_stack::Result<(), Error> { + // Acquire the batch lock first, and hold it until we complete. + // This ensures no batches can be acknowledged while we are in the + // process of checkpointing. + // + // This may cause new inputs to pause while checkpoints are taken. + // If this becomes a problem, we could look at a more sophisticated + // strategy -- for instace, we could first take the batches and concatenate + // them into a "pending-checkpoint", at which point we could release the lock. + // Then, once the checkpoint completes, we could acquire the lock and drop the + // pending checkpoint batch (since it is now available in the checkpoint). + let mut lock = self.batches.lock(); + let batches = std::mem::take(&mut lock.0); + self.checkpoints.lock().checkpoint(batches, lock.1)?; + Ok(()) + } + + fn in_memory_stream( + &self, + ) -> error_stack::Result>, Error> { + // This clones the vec to create an owned iterator without needing to hold the lock. + // Cloning the vec (and batches) shouldn't be too problematic given their expected + // size. If they are, we could concatenate them into a single batch, updating the + // in-memory set and then only cloning that one batch. + Ok(futures::stream::iter(self.batches.lock().0.clone()) + .map(Ok) + .boxed()) + } + + /// Return a stream over the current contents. + fn stream_current( + &self, + ) -> error_stack::Result>, Error> { + // Since we don't currently accept late data, we don't need to merge with the checkpoints. + // Instead, we can just chain at the end. + let checkpoint_stream = self.checkpoints.lock().read_checkpoints()?; + let in_memory_stream = self.in_memory_stream()?; + Ok(checkpoint_stream.chain(in_memory_stream).boxed()) + } + + /// Return a stream over the current contents and subscribed to future additions. + fn stream_subscribe( + &self, + ) -> error_stack::Result>, Error> { + // TODO: Merge with checkpoints, and subscribe to future changes. + self.stream_current() + } +} + +#[derive(Debug)] +struct BatchCheckpointer { + batches: Arc, + decoder: BatchDecoder, +} + +impl okaywal::LogManager for BatchCheckpointer { + fn recover(&mut self, entry: &mut okaywal::Entry<'_>) -> std::io::Result<()> { + // DO NOT SUBMIT: Handle errors + if let Some(batch) = self.decoder.decode(entry).unwrap() { + self.batches.add_batch(batch).unwrap(); + } + Ok(()) + } + + fn checkpoint_to( + &mut self, + _last_checkpointed_id: okaywal::EntryId, + _checkpointed_entries: &mut okaywal::SegmentReader, + _wal: &okaywal::WriteAheadLog, + ) -> std::io::Result<()> { + // TODO: This should use `_last_checkpointed_id` to ensure that only batches + // that are part of the checkpoint are included. + self.batches + .checkpoint() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + } +} + +#[cfg(test)] +mod tests { + use std::marker::PhantomData; + use std::ops::DerefMut; + + use futures::{Stream, TryStreamExt}; + + use super::*; + + struct JournalTester { + tempdir: tempfile::TempDir, + data_type: DataType, + } + + impl JournalTester { + fn new(data_type: DataType) -> Self { + sparrow_testing::init_test_logging(); + + let tempdir = tempfile::Builder::new() + .prefix("journal") + .tempdir() + .unwrap(); + Self { tempdir, data_type } + } + + fn journal(&mut self) -> GuardedJournal<'_> { + let journal = + BatchJournal::recover(self.tempdir.path(), self.data_type.clone()).unwrap(); + GuardedJournal { + journal: Some(journal), + phantom: PhantomData, + } + } + } + + struct GuardedJournal<'a> { + journal: Option, + phantom: PhantomData<&'a ()>, + } + + impl<'a> std::ops::Deref for GuardedJournal<'a> { + type Target = BatchJournal; + fn deref(&self) -> &Self::Target { + self.journal.as_ref().expect("not yet dropped") + } + } + + impl<'a> DerefMut for GuardedJournal<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.journal.as_mut().expect("not yet dropped") + } + } + + impl<'a> Drop for GuardedJournal<'a> { + fn drop(&mut self) { + // Make sure the journal shutsdown and checkpoints finish, etc. + let journal = self.journal.take().expect("not yet dropped"); + journal.wal.shutdown().unwrap(); + } + } + + fn batch_from_stream(stream: impl Stream>) -> Batch { + let batches: Vec<_> = futures::executor::block_on(stream.try_collect()).unwrap(); + let up_to_time = batches + .last() + .map(|last| last.up_to_time) + .unwrap_or_default(); + Batch::concat(batches, up_to_time).unwrap() + } + + #[test] + fn test_journal_one() { + let mut tester = JournalTester::new(DataType::Null); + + let batch = Batch::null_from(vec![0, 1, 2], vec![10, 11, 12], vec![2, 3, 4], 2); + + { + let mut journal = tester.journal(); + journal.journal(batch.clone()).unwrap(); + + assert_eq!(batch_from_stream(journal.stream_current().unwrap()), batch) + } + } + + #[test] + fn test_journal_one_and_recover() { + let mut tester = JournalTester::new(DataType::Null); + + let batch = Batch::null_from(vec![0, 1, 2], vec![10, 11, 12], vec![2, 3, 4], 2); + + { + let mut journal = tester.journal(); + journal.journal(batch.clone()).unwrap(); + } + + { + let recovered = tester.journal(); + assert_eq!( + batch_from_stream(recovered.stream_current().unwrap()), + batch + ); + } + } + + #[test] + fn test_journal_two_and_recover() { + let mut tester = JournalTester::new(DataType::Null); + + { + let mut journal = tester.journal(); + journal + .journal(Batch::null_from(vec![0, 1], vec![10, 11], vec![2, 3], 1)) + .unwrap(); + + journal + .journal(Batch::null_from(vec![2, 3], vec![10, 11], vec![2, 2], 3)) + .unwrap(); + } + + { + let recovered = tester.journal(); + assert_eq!( + batch_from_stream(recovered.stream_current().unwrap()), + Batch::null_from(vec![0, 1, 2, 3], vec![10, 11, 10, 11], vec![2, 3, 2, 2], 3) + ); + } + } + + #[test] + fn test_journal_and_checkpoint_and_recover() { + let mut tester = JournalTester::new(DataType::Null); + + { + let mut journal = tester.journal(); + journal + .journal(Batch::null_from(vec![0, 1], vec![10, 11], vec![2, 3], 1)) + .unwrap(); + + journal + .journal(Batch::null_from(vec![2, 3], vec![10, 11], vec![2, 2], 3)) + .unwrap(); + journal.checkpoint().unwrap(); + } + + { + let recovered = tester.journal(); + assert_eq!( + batch_from_stream(recovered.stream_current().unwrap()), + Batch::null_from(vec![0, 1, 2, 3], vec![10, 11, 10, 11], vec![2, 3, 2, 2], 3) + ); + } + } +} diff --git a/crates/sparrow-runtime/Cargo.toml b/crates/sparrow-runtime/Cargo.toml index 6880a7407..288410dbd 100644 --- a/crates/sparrow-runtime/Cargo.toml +++ b/crates/sparrow-runtime/Cargo.toml @@ -52,7 +52,7 @@ sparrow-arrow = { path = "../sparrow-arrow" } sparrow-compiler = { path = "../sparrow-compiler" } sparrow-core = { path = "../sparrow-core" } sparrow-instructions = { path = "../sparrow-instructions" } -sparrow-merge = { path = "../sparrow-merge", features = ["testing"] } +sparrow-merge = { path = "../sparrow-merge" } sparrow-qfr = { path = "../sparrow-qfr" } static_init.workspace = true tempfile.workspace = true @@ -71,6 +71,7 @@ proptest.workspace = true rand.workspace = true serde_json.workspace = true postcard.workspace = true +sparrow-merge = { path = "../sparrow-merge", features = ["testing"] } sparrow-testing = { path = "../sparrow-testing" } tempfile.workspace = true