diff --git a/cerbero/build/build.py b/cerbero/build/build.py index 23eadbfee..89b1a08a4 100644 --- a/cerbero/build/build.py +++ b/cerbero/build/build.py @@ -1342,7 +1342,7 @@ def __init__(self): def get_cargoc_args(self): cargoc_args = [ - '--release', '--frozen', + '--release', '--prefix', self.config.prefix, '--libdir', self.config.libdir, ] diff --git a/recipes/gst-plugins-rs.recipe b/recipes/gst-plugins-rs.recipe index 6b3918079..427a3eecb 100644 --- a/recipes/gst-plugins-rs.recipe +++ b/recipes/gst-plugins-rs.recipe @@ -82,6 +82,8 @@ class Recipe(recipe.Recipe): deps = ['gstreamer-1.0', 'gst-plugins-base-1.0', 'pango', 'cairo', 'gst-plugins-bad-1.0', 'dav1d'] + patches = ['gst-plugins-rs/0001-awss3sink-soft-seek.patch'] + def enable_plugin(self, name, category): if self.library_type in (LibraryType.SHARED, LibraryType.BOTH): attr = f'files_plugins_{category}' diff --git a/recipes/gst-plugins-rs/0001-awss3sink-soft-seek.patch b/recipes/gst-plugins-rs/0001-awss3sink-soft-seek.patch new file mode 100644 index 000000000..66efd8412 --- /dev/null +++ b/recipes/gst-plugins-rs/0001-awss3sink-soft-seek.patch @@ -0,0 +1,1800 @@ +From 1dee50dc59111b4a184860812eee013623e268d2 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Fri, 17 May 2024 13:20:51 -0400 +Subject: [PATCH 1/9] awss3sink: changed scope of aws max parts constant + +Moved this to the outer scope with the rest of the +constants so that it too may be used elsewhere. + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 6 +++--- + 1 file changed, 3 insertions(+), 3 deletions(-) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index 5f2755b7..fb73d865 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -53,6 +53,9 @@ const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000; + const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes + const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes + ++// https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html ++const MAX_MULTIPART_NUMBER: i64 = 10000; ++ + struct Started { + client: Client, + buffer: Vec, +@@ -73,9 +76,6 @@ impl Started { + } + + pub fn increment_part_number(&mut self) -> Result { +- // https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html +- const MAX_MULTIPART_NUMBER: i64 = 10000; +- + if self.part_number > MAX_MULTIPART_NUMBER { + return Err(gst::error_msg!( + gst::ResourceError::Failed, +-- +2.34.1 + + +From 7c2a1d00cbbded25864db21d564da371bbfce33c Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Wed, 22 May 2024 13:02:58 -0400 +Subject: [PATCH 2/9] awss3sink: buffer size to usize (32 vs. 64-bit) + +Since tier-1 archs for rust are 32 and 64-bit, making the +assertion here that one could address an offset in the +local part buffer (Vec type) with a 64-bit address can +cause problems. Using usize on the otherhand allows for +flexibility for 32-bit architectures as long as the +maximum for the gobject property is also limited according +to the host architecture (or enabled to be full-size on +64-bit). + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 26 ++++++++++++++++++-------- + 1 file changed, 18 insertions(+), 8 deletions(-) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index fb73d865..1d0ea7c6 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -39,7 +39,6 @@ use super::OnError; + + const DEFAULT_FORCE_PATH_STYLE: bool = false; + const DEFAULT_RETRY_ATTEMPTS: u32 = 5; +-const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024; + const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; + + // General setting for create / abort requests +@@ -105,7 +104,7 @@ struct Settings { + key: Option, + content_type: Option, + content_disposition: Option, +- buffer_size: u64, ++ buffer_size: usize, + access_key: Option, + secret_access_key: Option, + session_token: Option, +@@ -164,7 +163,7 @@ impl Default for Settings { + secret_access_key: None, + session_token: None, + metadata: None, +- buffer_size: DEFAULT_BUFFER_SIZE, ++ buffer_size: 0, + retry_attempts: DEFAULT_RETRY_ATTEMPTS, + multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, + request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC), +@@ -655,6 +654,16 @@ impl ObjectImpl for S3Sink { + + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { ++ // AWS min/max is 5 MB -> 5GB ++ // Rust Vec, used in this module, has a maximum size of usize, which is ++ // tied to the system architecture. Per this, all tier 1 architectures ++ // are 32 or 64-bit, so the 5 MB minimum is no big deal: ++ // https://doc.rust-lang.org/nightly/rustc/platform-support.html ++ // ++ // However the 5 GB max would exceed 32-bit architectures. ++ let min_buffer_size: u64 = 5 * 1024_u64.pow(2); ++ let max_buffer_size: u64 = std::cmp::min(std::usize::MAX as u64, 5 * 1024_u64.pow(3)); ++ let default_buffer_size: u64 = min_buffer_size; + vec![ + glib::ParamSpecString::builder("bucket") + .nick("S3 Bucket") +@@ -675,9 +684,10 @@ impl ObjectImpl for S3Sink { + glib::ParamSpecUInt64::builder("part-size") + .nick("Part size") + .blurb("A size (in bytes) of an individual part used for multipart upload.") +- .minimum(5 * 1024 * 1024) // 5 MB +- .maximum(5 * 1024 * 1024 * 1024) // 5 GB +- .default_value(DEFAULT_BUFFER_SIZE) ++ .minimum(min_buffer_size) ++ .maximum(max_buffer_size) ++ .default_value(default_buffer_size) ++ .construct() + .mutable_ready() + .build(), + glib::ParamSpecString::builder("uri") +@@ -812,7 +822,7 @@ impl ObjectImpl for S3Sink { + } + } + "part-size" => { +- settings.buffer_size = value.get::().expect("type checked upstream"); ++ settings.buffer_size = value.get::().expect("type checked upstream").try_into().unwrap(); + } + "uri" => { + let _ = self.set_uri(value.get().expect("type checked upstream")); +@@ -897,7 +907,7 @@ impl ObjectImpl for S3Sink { + "key" => settings.key.to_value(), + "bucket" => settings.bucket.to_value(), + "region" => settings.region.to_string().to_value(), +- "part-size" => settings.buffer_size.to_value(), ++ "part-size" => (settings.buffer_size as u64).to_value(), + "uri" => { + let url = self.url.lock().unwrap(); + let url = match *url { +-- +2.34.1 + + +From 523a673f0e3199798be29c0bf9f4606f0b196b62 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Thu, 13 Jun 2024 09:34:44 -0400 +Subject: [PATCH 3/9] awss3sink: part_num represents current part buffer + +Previously, the number would be incremented only when +about to flush (upload), so during the filling process +it would represent the previous part number while the +buffer represented the current. This change (and +comments added) are to clarify that these two are now +in sync. + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 18 +++++++++--------- + 1 file changed, 9 insertions(+), 9 deletions(-) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index 1d0ea7c6..cf51004e 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -57,9 +57,9 @@ const MAX_MULTIPART_NUMBER: i64 = 10000; + + struct Started { + client: Client, +- buffer: Vec, ++ buffer: Vec, // the active part's buffer + upload_id: String, +- part_number: i64, ++ part_number: i64, // the active part number + completed_parts: Vec, + } + +@@ -69,7 +69,7 @@ impl Started { + client, + buffer, + upload_id, +- part_number: 0, ++ part_number: 1, + completed_parts: Vec::new(), + } + } +@@ -258,8 +258,6 @@ impl S3Sink { + } + }; + +- let part_number = state.part_number; +- + let upload_part_req_future = upload_part_req.send(); + let output = + s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { +@@ -275,11 +273,14 @@ impl S3Sink { + + let completed_part = CompletedPart::builder() + .set_e_tag(output.e_tag) +- .set_part_number(Some(part_number as i32)) ++ .set_part_number(Some(state.part_number as i32)) + .build(); + state.completed_parts.push(completed_part); + +- gst::info!(CAT, imp: self, "Uploaded part {}", part_number); ++ gst::info!(CAT, imp: self, "Uploaded part {}", state.part_number); ++ ++ // Increment part number ++ state.increment_part_number()?; + + Ok(()) + } +@@ -298,7 +299,6 @@ impl S3Sink { + } + }; + +- let part_number = state.increment_part_number()?; + let body = Some(ByteStream::from(std::mem::replace( + &mut state.buffer, + Vec::with_capacity(settings.buffer_size as usize), +@@ -315,7 +315,7 @@ impl S3Sink { + .set_bucket(bucket) + .set_key(key) + .set_upload_id(upload_id) +- .set_part_number(Some(part_number as i32)); ++ .set_part_number(Some(state.part_number as i32)); + + Ok(upload_part) + } +-- +2.34.1 + + +From 8b8de71a7a39a22727cd4ecfa9f5eb968f3d61d1 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Fri, 17 May 2024 13:22:55 -0400 +Subject: [PATCH 4/9] awss3sink: added UploaderPartCache history with tests + +The UploaderPartCache stores data about the buffers +and sizes that have been sent to AWS over the mutli- +part uploader. The design retaints a copy of the +buffers that fall within the limits of 'depth' as well +as the size of the buffer (whether retained or not) +so that various features can be leveraged to support +a partial seek functionality. + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 411 ++++++++++++++++++++++++++++ + 1 file changed, 411 insertions(+) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index cf51004e..178dbb8b 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -90,6 +90,417 @@ impl Started { + } + } + ++struct PartInfo { ++ pub buffer: Vec, ++ pub data_size: usize, ++} ++ ++impl PartInfo { ++ fn new(size: usize, capacity: usize) -> PartInfo { ++ PartInfo { ++ buffer: Vec::with_capacity(capacity), ++ data_size: size, ++ } ++ } ++} ++ ++struct UploaderPartCache { ++ from_head: bool, ++ max_depth: usize, ++ cache: Vec, ++} ++ ++impl UploaderPartCache { ++ pub fn new(depth: i64) -> UploaderPartCache { ++ UploaderPartCache { ++ from_head: (depth > 0), ++ max_depth: depth.abs().try_into().unwrap(), ++ cache: Default::default(), ++ } ++ } ++ ++ pub fn get>(&self, part_num: T) -> Option<&PartInfo> { ++ let part_num = part_num.into() as usize; ++ self.cache.get(part_num - 1) ++ } ++ ++ #[allow(unused)] ++ pub fn get_mut>(&mut self, part_num: T) -> Option<&mut PartInfo> { ++ let part_num = part_num.into() as usize; ++ self.cache.get_mut(part_num - 1) ++ } ++ ++ /** ++ * Returns the beginning and ending offsets (in bytes) that can be retrieved ++ * from the cache. This might be a helpful alternative to calling find if ++ * the only interest is if one can expect the find to succeed. ++ */ ++ pub fn coverage_limits(&self) -> (u64, u64) { ++ let mut beginning: Option = None; ++ let mut ending: Option = None; ++ let mut offset: u64 = 0; ++ ++ for record in self.cache.iter() { ++ let offset_after = offset + record.buffer.len() as u64; ++ if record.buffer.len() > 0 { ++ if beginning.is_none() { ++ beginning = Some(offset); ++ } ++ ++ if beginning.is_some() { ++ ending = Some(offset_after - 1); ++ } ++ ++ offset = offset_after; ++ } else if record.data_size > 0 { ++ offset += record.data_size as u64; ++ } ++ } ++ (beginning.unwrap_or(0), ending.unwrap_or(0)) ++ } ++ ++ pub fn coverage_range(&self) -> std::ops::Range { ++ let (start, end) = self.coverage_limits(); ++ start..end ++ } ++ ++ pub fn update_or_append>(&mut self, part_num: T, buffer: &Vec) -> bool { ++ // confirm the part number makes logical sense: positive, non-zero, less than ++ // the maximum amount permitted by AWS. ++ let part_num = part_num.into() as usize; ++ let max_part: usize = MAX_MULTIPART_NUMBER.try_into().unwrap(); ++ let part_idx = part_num - 1; ++ if part_num > max_part || part_num == 0 || part_idx > self.cache.len() { ++ return false; ++ } ++ ++ if part_idx == self.cache.len() { ++ // Insert new one ++ // NOTE: will deal with buffer later if necessary ++ self.cache ++ .push(PartInfo::new(buffer.len(), buffer.capacity())); ++ } else { ++ // update data size, at a minimum ++ self.cache[part_idx].data_size = buffer.len(); ++ } ++ ++ if self.max_depth > 0 { ++ let first: usize; ++ let last: usize; ++ ++ if self.from_head { ++ // Keeping up to the first N buffers ++ last = self.max_depth - 1; ++ first = 0; ++ } else { ++ // Keeping the last / most recent N buffers ++ last = self.cache.len(); ++ if self.max_depth < last { ++ let l = last as isize; ++ let d = self.max_depth as isize; ++ first = (l - d).try_into().unwrap(); ++ } else { ++ first = 0; ++ } ++ } ++ ++ for (i, part) in self.cache.iter_mut().enumerate() { ++ if first <= i && i <= last { ++ // part is in 'retain' range ++ if i == part_idx { ++ // 'buffer' is this part; update it ++ part.buffer = buffer.to_owned(); ++ part.data_size = buffer.len(); ++ } ++ } else { ++ // part not in 'retain' range; ensure it's cleared. ++ part.buffer.clear(); ++ } ++ } ++ } ++ return true; ++ } ++ ++ #[allow(unused)] ++ pub fn get_copy>( ++ &self, ++ part_num: T, ++ buffer: &mut Vec, ++ size: &mut usize, ++ ) -> bool { ++ match self.get(part_num) { ++ Some(part) => { ++ *buffer = part.buffer.to_vec(); ++ *size = part.data_size; ++ true ++ } ++ None => false, ++ } ++ } ++ ++ /** ++ * Find 'offset' within the part cache (or don't). If found, 'part_num' and 'size' will ++ * be from the cache. If the buffer was stored per the configuration, then 'buffer' will ++ * be filled with a copy. ++ */ ++ pub fn find(&self, offset: u64, part_num: &mut u16) -> Result<&PartInfo, gst::ErrorMessage> { ++ let mut i = 1; ++ let mut start = 0_u64; ++ ++ for item in self.cache.iter() { ++ let item_size: u64 = item.data_size.try_into().unwrap(); ++ let range = start..start + item_size; ++ ++ if range.contains(&offset) { ++ *part_num = i; ++ return Ok(self.get(i).unwrap()); ++ } ++ i += 1; ++ start += item_size; ++ } ++ return Err(gst::error_msg!( ++ gst::ResourceError::NotFound, ++ ["Could not find part {i} in cache"] ++ )); ++ } ++} ++ ++// Tests for UploaderPartCache ++#[cfg(test)] ++mod tests { ++ use crate::s3sink::multipartsink::UploaderPartCache; ++ ++ /** ++ * Test inserts a buffer of length 100 ++ */ ++ #[test] ++ fn cache_disabled() { ++ const DEPTH: i64 = 0; ++ const SIZE_BUFFER: usize = 100; ++ ++ let mut uut = UploaderPartCache::new(DEPTH); ++ let buffer = vec![0; SIZE_BUFFER]; ++ let mut part_num: u16 = 0; ++ ++ // Nothing stored, so cache availability. ++ let mut limits = uut.coverage_limits(); ++ assert_eq!(0, limits.0); ++ assert_eq!(0, limits.1); ++ ++ // Insert and 'find' should both be TRUE since we're looking for the ++ // cached buffer that would have offset 50 in it. The resulting ++ // buffer however is empty, since caching is "disabled". ++ assert_eq!(uut.cache.len(), 0); ++ assert!(uut.update_or_append(1_usize, &buffer)); ++ assert_eq!(uut.cache.len(), 1); ++ ++ let result = uut.find(SIZE_BUFFER as u64 / 2, &mut part_num).unwrap(); ++ assert_eq!(0, result.buffer.len()); ++ assert_eq!(SIZE_BUFFER, result.buffer.capacity()); ++ assert_eq!(1, part_num); ++ ++ // 'get' should work too, same behavior as above since caching ++ // of the contents of the part is disabled. ++ let mut out_buffer = vec![0; SIZE_BUFFER]; ++ let mut out_buffer_size: usize = 0; ++ assert!(uut.get_copy(part_num, &mut out_buffer, &mut out_buffer_size)); ++ assert_eq!(out_buffer.len(), 0); ++ assert_eq!(SIZE_BUFFER, out_buffer_size); ++ ++ // Still nothing actually stored in the cache, so still 0's for ++ // the limits. ++ limits = uut.coverage_limits(); ++ assert_eq!(0, limits.0); ++ assert_eq!(0, limits.1); ++ } ++ ++ /** ++ * Push 3 100 byte parts and verify 'find' gets ++ * the right parts vs. the offsets. ++ * Part 1: 0 - 99 ++ * Part 2: 100 - 199 ++ * Part 3: 200 - 299 ++ */ ++ #[test] ++ fn find_by_offset() { ++ const BUFFER_SIZE: usize = 100; ++ const NUM_PARTS: usize = 3; ++ let mut uut = UploaderPartCache::new(0); ++ ++ // Populate the cache ++ for i in 1..NUM_PARTS as usize { ++ assert_eq!(uut.cache.len(), i - 1); ++ assert!(uut.update_or_append(i, &vec![0; BUFFER_SIZE])); ++ assert_eq!(uut.cache.len(), i); ++ ++ // coverage offsets should be unchanged; depth is 0 (no cache). ++ let limits = uut.coverage_limits(); ++ assert_eq!(0, limits.0); ++ assert_eq!(0, limits.1); ++ } ++ ++ // Validate the cache offsets ++ let mut offset_start: u64 = 0; ++ for i in 1..NUM_PARTS as u16 { ++ let mut out_part_num = 0; ++ let offset_end = (offset_start + BUFFER_SIZE as u64) - 1; ++ ++ let mut result = uut.find(offset_start, &mut out_part_num); ++ assert!(result.is_ok()); ++ assert_eq!(out_part_num, i); ++ ++ result = uut.find(offset_end, &mut out_part_num); ++ assert!(result.is_ok()); ++ assert_eq!(out_part_num, i); ++ ++ offset_start = offset_end + 1; ++ } ++ } ++ ++ /** ++ * Test various failure modes for cache misses on insert/update and get ++ */ ++ #[test] ++ fn cache_miss() { ++ const BUFFER_SIZE: usize = 100; ++ let mut uut = UploaderPartCache::new(0); ++ let mut out_buffer_size = 0; ++ let mut out_buffer: Vec = Default::default(); ++ let mut out_part_num = 0; ++ let buffer = vec![0; BUFFER_SIZE]; ++ ++ // Should not be able to find anything; nothing exists yet. ++ assert!(uut.find(20, &mut out_part_num).is_err()); ++ ++ // Should not be able to get the first part, it hasn't been inserted. ++ assert!(!uut.get_copy(1_u16, &mut out_buffer, &mut out_buffer_size)); ++ ++ // Size is 0, so inserting part number 2 is invalid; this should fail. ++ assert!(!uut.update_or_append(2_usize, &out_buffer)); ++ ++ // Should be able to insert part 1 ++ assert!(uut.update_or_append(1_usize, &buffer)); ++ ++ // Should be able to access part 1, but it's buffer should be empty ++ // since it's beyond the depth being retained in the cache. ++ let result = uut.find(BUFFER_SIZE as u64 - 1, &mut out_part_num).unwrap(); ++ assert_eq!(result.buffer.len(), 0); ++ assert_eq!(result.data_size, BUFFER_SIZE); ++ ++ // Should not be able to find offset 100 since that would be part 2 ++ assert!(uut.find(100, &mut out_part_num).is_err()); ++ } ++ ++ /** ++ * Verify the behavior of retaining the first N parts, remainders are empty. ++ */ ++ #[test] ++ fn retain_head() { ++ const BUFFER_SIZE: usize = 100; ++ let mut uut = UploaderPartCache::new(2); ++ let in_buffer = vec![0; BUFFER_SIZE]; ++ let mut out_buffer: Vec = Default::default(); ++ let mut out_buffer_size: usize = 0; ++ ++ assert_eq!(uut.cache.len(), 0); ++ ++ for i in 1..=uut.max_depth + 1 { ++ let mut temp: Vec = Default::default(); ++ let mut temp_size = 0 as usize; ++ ++ assert!(uut.update_or_append(i, &in_buffer)); ++ ++ // Since this is head retention, immediately upon insertion ++ // if the part number is within the limit, it should be kept, ++ // otherwise immediately dropped. ++ assert!(uut.get_copy(i, &mut temp, &mut temp_size)); ++ if i <= uut.max_depth { ++ // Retained ++ assert!(temp.len() != 0); ++ assert!(temp_size == BUFFER_SIZE); ++ } else { ++ // Dropped ++ assert!(temp.len() == 0); ++ assert!(temp_size == BUFFER_SIZE); ++ } ++ } ++ // There should be 3 parts in the cache (though only 2 are retained). ++ assert_eq!(uut.cache.len(), 3); ++ ++ // Coverage offsets should be 0 to BUFFER_SIZE*2 - 1 (the end of ++ // buffer 2). ++ let offsets = uut.coverage_limits(); ++ assert_eq!(0, offsets.0); ++ assert_eq!((BUFFER_SIZE as u64) * 2 - 1, offsets.1); ++ ++ // 1 and 2 should have a buffer, 3 should not. ++ assert!(uut.get_copy(1_u16, &mut out_buffer, &mut out_buffer_size)); ++ assert!(out_buffer.len() == BUFFER_SIZE); ++ assert!(out_buffer_size == BUFFER_SIZE); ++ out_buffer.clear(); ++ out_buffer_size = 0; ++ ++ assert!(uut.get_copy(2_u16, &mut out_buffer, &mut out_buffer_size)); ++ assert!(out_buffer.len() == BUFFER_SIZE); ++ assert!(out_buffer_size == BUFFER_SIZE); ++ out_buffer.clear(); ++ out_buffer_size = 0; ++ ++ assert!(uut.get_copy(3_u16, &mut out_buffer, &mut out_buffer_size)); ++ assert!(out_buffer.len() == 0); ++ assert!(out_buffer_size == BUFFER_SIZE); ++ } ++ ++ /** ++ * Verify the behavior of retaining the last N parts, remainders are empty. ++ */ ++ #[test] ++ fn retain_tail() { ++ const BUFFER_SIZE: usize = 100; ++ let mut uut = UploaderPartCache::new(-2); ++ let in_buffer = vec![0; BUFFER_SIZE]; ++ let mut out_buffer: Vec = Default::default(); ++ let mut out_buffer_size = 0; ++ ++ assert_eq!(uut.cache.len(), 0); ++ for i in 1..=uut.max_depth + 1 { ++ let mut temp: Vec = Default::default(); ++ let mut temp_size = 0 as usize; ++ ++ assert!(uut.update_or_append(i, &in_buffer)); ++ ++ // Since this is tail retention, the most recent part should ++ // always be retained. ++ assert!(uut.get_copy(i, &mut temp, &mut temp_size)); ++ assert!(temp.len() != 0); ++ assert!(temp_size == BUFFER_SIZE); ++ } ++ ++ // 1 should not have a buffer, 2 and 3 should. ++ assert!(uut.get_copy(1_usize, &mut out_buffer, &mut out_buffer_size)); ++ assert!(out_buffer.len() == 0); ++ assert!(out_buffer_size == BUFFER_SIZE); ++ out_buffer.clear(); ++ out_buffer_size = 0; ++ ++ assert!(uut.get_copy(2_usize, &mut out_buffer, &mut out_buffer_size)); ++ assert!(out_buffer.len() == BUFFER_SIZE); ++ assert!(out_buffer_size == BUFFER_SIZE); ++ out_buffer.clear(); ++ out_buffer_size = 0; ++ ++ assert!(uut.get_copy(3_usize, &mut out_buffer, &mut out_buffer_size)); ++ assert!(out_buffer.len() == BUFFER_SIZE); ++ assert!(out_buffer_size == BUFFER_SIZE); ++ ++ // Coverage offsets should be BUFFER_SIZE to BUFFER_SIZE*3 - 1 (the end of ++ // buffer 2). ++ let offsets = uut.coverage_limits(); ++ assert_eq!(BUFFER_SIZE as u64, offsets.0); ++ assert_eq!((BUFFER_SIZE as u64) * 3 - 1, offsets.1); ++ } ++} ++ + #[derive(Default)] + enum State { + #[default] +-- +2.34.1 + + +From 083f92ea4ce6c6b3cf841745c5732d07eac06faf Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Wed, 22 May 2024 13:22:43 -0400 +Subject: [PATCH 5/9] awss3sink: added 'num-cache-parts', instantiated cache + +Default configuration is 0, no cache, as prior to integration. + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 25 ++++++++++++++++++++++++- + 1 file changed, 24 insertions(+), 1 deletion(-) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index 178dbb8b..9f046d61 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -39,6 +39,7 @@ use super::OnError; + + const DEFAULT_FORCE_PATH_STYLE: bool = false; + const DEFAULT_RETRY_ATTEMPTS: u32 = 5; ++const DEFAULT_NUM_CACHED_PARTS: i64 = 0; + const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; + + // General setting for create / abort requests +@@ -61,16 +62,23 @@ struct Started { + upload_id: String, + part_number: i64, // the active part number + completed_parts: Vec, ++ cache: UploaderPartCache, + } + + impl Started { +- pub fn new(client: Client, buffer: Vec, upload_id: String) -> Started { ++ pub fn new( ++ client: Client, ++ buffer: Vec, ++ upload_id: String, ++ num_cache_parts: i64, ++ ) -> Started { + Started { + client, + buffer, + upload_id, + part_number: 1, + completed_parts: Vec::new(), ++ cache: UploaderPartCache::new(num_cache_parts), + } + } + +@@ -516,6 +524,7 @@ struct Settings { + content_type: Option, + content_disposition: Option, + buffer_size: usize, ++ num_cached_parts: i64, + access_key: Option, + secret_access_key: Option, + session_token: Option, +@@ -575,6 +584,7 @@ impl Default for Settings { + session_token: None, + metadata: None, + buffer_size: 0, ++ num_cached_parts: DEFAULT_NUM_CACHED_PARTS, + retry_attempts: DEFAULT_RETRY_ATTEMPTS, + multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, + request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC), +@@ -977,6 +987,7 @@ impl S3Sink { + client, + Vec::with_capacity(settings.buffer_size as usize), + upload_id, ++ settings.num_cached_parts, + )); + + Ok(()) +@@ -1101,6 +1112,14 @@ impl ObjectImpl for S3Sink { + .construct() + .mutable_ready() + .build(), ++ glib::ParamSpecInt64::builder("num-cached-parts") ++ .nick("Number of parts to cache (seeking)") ++ .blurb("Number of parts to cache to enable seeking before the multipart upload completes") ++ .minimum(-1 * MAX_MULTIPART_NUMBER) ++ .maximum(MAX_MULTIPART_NUMBER) ++ .default_value(DEFAULT_NUM_CACHED_PARTS) ++ .mutable_ready() ++ .build(), + glib::ParamSpecString::builder("uri") + .nick("URI") + .blurb("The S3 object URI") +@@ -1235,6 +1254,9 @@ impl ObjectImpl for S3Sink { + "part-size" => { + settings.buffer_size = value.get::().expect("type checked upstream").try_into().unwrap(); + } ++ "num-cached-parts" => { ++ settings.num_cached_parts = value.get::().expect("type checked upstream"); ++ } + "uri" => { + let _ = self.set_uri(value.get().expect("type checked upstream")); + } +@@ -1319,6 +1341,7 @@ impl ObjectImpl for S3Sink { + "bucket" => settings.bucket.to_value(), + "region" => settings.region.to_string().to_value(), + "part-size" => (settings.buffer_size as u64).to_value(), ++ "num-cached-parts" => settings.num_cached_parts.to_value(), + "uri" => { + let url = self.url.lock().unwrap(); + let url = match *url { +-- +2.34.1 + + +From e81b6d2987cc6891902ad2be9bb21400ed608ed5 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Thu, 13 Jun 2024 11:03:57 -0400 +Subject: [PATCH 6/9] awss3sink: make multipart test preamble reusable + +Moved some of the environment/uri logic out of the singular +multipart test so that it can be reused in the future +tests against the multipart uploader. + +Signed-off-by: Thomas Goodwin +--- + net/aws/tests/s3.rs | 19 ++++++++++++++----- + 1 file changed, 14 insertions(+), 5 deletions(-) + +diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs +index 5d21d35c..8b5ad49d 100644 +--- a/net/aws/tests/s3.rs ++++ b/net/aws/tests/s3.rs +@@ -55,15 +55,24 @@ mod tests { + .unwrap(); + } + +- // Common helper +- async fn do_s3_multipart_test(key_prefix: &str) { +- init(); +- ++ fn get_env_args(key_prefix: &str) -> (String, String, String) { + let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string()); + let bucket = + std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string()); + let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now()); +- let uri = format!("s3://{region}/{bucket}/{key}"); ++ (region, bucket, key) ++ } ++ ++ fn get_uri(region: &str, bucket: &str, key: &str) -> String { ++ format!("s3://{region}/{bucket}/{key}") ++ } ++ ++ // Common helper ++ async fn do_s3_multipart_test(key_prefix: &str) { ++ init(); ++ ++ let (region, bucket, key) = get_env_args(key_prefix); ++ let uri = get_uri(®ion, &bucket, &key); + let content = "Hello, world!\n".as_bytes(); + + // Manually add the element so we can configure it before it goes to PLAYING +-- +2.34.1 + + +From 79eb0eb1e9d3ce7876a7c1999399351c49a51ff5 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Thu, 13 Jun 2024 11:19:42 -0400 +Subject: [PATCH 7/9] awss3sink: implemented position query + +Added data member, upload_pos, representing the current +write head position within the overall upload. It is +incremented when flushing the buffer. Test added to +verify it does indeed update as buffers are flushed +to s3. + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 32 +++++++++++++++++++ + net/aws/tests/s3.rs | 48 +++++++++++++++++++++++++++++ + 2 files changed, 80 insertions(+) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index 9f046d61..e5c53501 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -63,6 +63,10 @@ struct Started { + part_number: i64, // the active part number + completed_parts: Vec, + cache: UploaderPartCache, ++ // The overall upload's current write head position. ++ // Given the AWS limits of 5GB part size and 10k parts, this is: ++ // (5*1024^3) * 10,000 = 53,687,091,200,000, a 64-bit number. ++ upload_pos: u64, + } + + impl Started { +@@ -79,6 +83,7 @@ impl Started { + part_number: 1, + completed_parts: Vec::new(), + cache: UploaderPartCache::new(num_cache_parts), ++ upload_pos: 0, + } + } + +@@ -720,6 +725,9 @@ impl S3Sink { + } + }; + ++ // Update the position within the upload ++ state.upload_pos += state.buffer.len() as u64; ++ + let body = Some(ByteStream::from(std::mem::replace( + &mut state.buffer, + Vec::with_capacity(settings.buffer_size as usize), +@@ -1506,6 +1514,30 @@ impl BaseSinkImpl for S3Sink { + Ok(()) + } + ++ fn query(&self, query: &mut gst::QueryRef) -> bool { ++ match query.view_mut() { ++ gst::QueryViewMut::Formats(fmt) => { ++ fmt.set(&vec![gst::Format::Bytes]); ++ true ++ } ++ gst::QueryViewMut::Position(pos) => { ++ if pos.format() == gst::Format::Bytes { ++ let mut state = self.state.lock().unwrap(); ++ match *state { ++ State::Started(ref mut started_state) => { ++ pos.set(gst::format::Bytes::from_u64(started_state.upload_pos)); ++ true ++ } ++ _ => false, ++ } ++ } else { ++ false ++ } ++ } ++ _ => BaseSinkImplExt::parent_query(self, query), ++ } ++ } ++ + fn event(&self, event: gst::Event) -> bool { + if let gst::EventView::Eos(_) = event.view() { + if let Err(error_message) = self.finalize_upload() { +diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs +index 8b5ad49d..ebfc54bb 100644 +--- a/net/aws/tests/s3.rs ++++ b/net/aws/tests/s3.rs +@@ -200,6 +200,54 @@ mod tests { + do_s3_multipart_test("s3 🧪 😱").await; + } + ++ #[ignore = "failing, needs investigation"] ++ #[tokio::test] ++ async fn test_s3_multipart_query_position() { ++ // Verfies that the basesink::query handler is correctly handling Bytes -formatted ++ // position queries according to the current position in the overall upload. ++ init(); ++ ++ let (region, bucket, key) = get_env_args("query-position"); ++ let uri = get_uri(®ion, &bucket, &key); ++ ++ let mut h1 = gst_check::Harness::new_parse(&format!( ++ "awss3sink name=\"sink\" uri=\"{uri}\" num-cached-parts=1" ++ )); ++ let h1el = h1 ++ .element() ++ .unwrap() ++ .dynamic_cast::() ++ .unwrap() ++ .by_name("sink") ++ .unwrap(); ++ let part_size = h1el.property::("part-size"); ++ let pad = h1el.static_pad("sink").unwrap(); ++ ++ // Start. ++ h1.play(); ++ ++ // Position should be 0 on start ++ let mut position = pad.query_position::(); ++ assert_eq!(0, u64::from(position.unwrap())); ++ ++ // Push stream start, segment, and 3 buffers ++ let segment = gst::FormattedSegment::::new(); ++ h1.push_event(gst::event::StreamStart::builder(&"test-stream").build()); ++ h1.push_event(gst::event::Segment::new(&segment)); ++ for i in 1..=3 as u8 { ++ let buffer = make_buffer(&vec![i; part_size.try_into().unwrap()]); ++ h1.push(buffer).unwrap(); ++ ++ // Verify position is updating ++ position = pad.query_position::(); ++ assert_eq!(part_size * (i as u64), u64::from(position.unwrap())); ++ } ++ ++ // Finish and clean up. ++ h1.push_event(gst::event::Eos::new()); ++ delete_object(region.clone(), &bucket, &key).await; ++ } ++ + #[ignore = "failing, needs investigation"] + #[tokio::test] + async fn test_s3_put_object_simple() { +-- +2.34.1 + + +From 89b4885dbe865013ee2e585836cf7cf60b0248f1 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Thu, 13 Jun 2024 11:23:34 -0400 +Subject: [PATCH 8/9] awss3sink: implemented seeking query + +As noted in comments, the reply here is not soomething +we can accurately implement because there may be times +when the seekable region is actually in two different +ranges: the current buffer and the buffers within the +cache. Splitting the difference then, the reply will +be "everything" if the cache is configured to retain +any buffers, and "nothing" / not seekable if is not +(the default). + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 37 ++++++++++++++++++++++++++++ + net/aws/tests/s3.rs | 38 +++++++++++++++++++++++++++++ + 2 files changed, 75 insertions(+) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index e5c53501..c98353ea 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -1534,6 +1534,43 @@ impl BaseSinkImpl for S3Sink { + false + } + } ++ gst::QueryViewMut::Seeking(seek) => { ++ let (seekable, start, stop) = match seek.format() { ++ gst::Format::Bytes => { ++ let mut state = self.state.lock().unwrap(); ++ match *state { ++ State::Started(ref mut started_state) => { ++ // TODO: unclear how to really respond here since it is ++ // possible to seek to within the range of the cache OR to ++ // within the range of the currently active buffer, and when ++ // the currently active buffer is not in the bounds of what ++ // is in the cache, there's no way to provide both sets of ++ // limits to the reply. So if cache is enabled, give an ++ // affirmative "anywhere" response and we'll argue this point ++ // when dealing with segment events. ++ let mut max = 0_u64; ++ if started_state.cache.max_depth > 0 { ++ max = u64::MAX - 1; ++ } ++ (true, 0, max) ++ } ++ _ => (false, 0, 0), ++ } ++ } ++ _ => (false, 0, 0), ++ }; ++ ++ seek.set( ++ seekable, ++ gst::format::Bytes::from_u64(start), ++ gst::format::Bytes::from_u64(stop), ++ ); ++ // "no description available" in docs for return from query ++ // I'm going to assume 'true' here since the call did successfully ++ // determine if 'seekable' needs to be true or false and is set on ++ // the query. ++ true ++ } + _ => BaseSinkImplExt::parent_query(self, query), + } + } +diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs +index ebfc54bb..3998809f 100644 +--- a/net/aws/tests/s3.rs ++++ b/net/aws/tests/s3.rs +@@ -248,6 +248,44 @@ mod tests { + delete_object(region.clone(), &bucket, &key).await; + } + ++ #[ignore = "failing, needs investigation"] ++ #[tokio::test] ++ async fn test_s3_multipart_query_seeking() { ++ // Verfies the basesink::query handler is providing correct replies to Bytes -formatted ++ // Seeking queries. For now this test only verifies that it replies to seeking queries ++ // with 0->max if caching is enabled since at this time it's unclear the right way to ++ // represent a gap in seeking capability, for example head caching of 1 part but having ++ // written into part 5, a seek request (via segment event) could go into either the ++ // part cache or it could be within the locally-held buffer's range -- either of those ++ // is logically correct, but there's only one pair of limits in this reply. Therefore ++ // this current implementation only replies 0->max if the cache is enabled and will ++ // reply negatively in the event a segment goes out of range. ++ init(); ++ ++ let (region, bucket, key) = get_env_args("query-position"); ++ let uri = get_uri(®ion, &bucket, &key); ++ ++ let h1 = gst_check::Harness::new_parse(&format!( ++ "awss3sink name=\"sink\" uri=\"{uri}\" num-cached-parts=1" ++ )); ++ let h1el = h1 ++ .element() ++ .unwrap() ++ .dynamic_cast::() ++ .unwrap() ++ .by_name("sink") ++ .unwrap(); ++ let pad = h1el.static_pad("sink").unwrap(); ++ ++ let mut q = gst::query::Seeking::new(gst::Format::Bytes); ++ assert!(pad.query(q.query_mut())); ++ ++ let (seekable, lower, upper) = q.result(); ++ assert!(seekable); ++ assert_eq!(0, lower.value() as u64); ++ assert_eq!(u64::MAX - 1, upper.value() as u64); ++ } ++ + #[ignore = "failing, needs investigation"] + #[tokio::test] + async fn test_s3_put_object_simple() { +-- +2.34.1 + + +From 9324eb0b443cd0d3d0a015671ef05f792ce72254 Mon Sep 17 00:00:00 2001 +From: Thomas Goodwin +Date: Thu, 13 Jun 2024 16:08:46 -0400 +Subject: [PATCH 9/9] awss3sink: implemented seek via segment event + +Added support for seeking into the configured cache, +into the actively-filling part, detection of cache +misses, etc. Test coverage increased. + +Signed-off-by: Thomas Goodwin +--- + net/aws/src/s3sink/multipartsink.rs | 256 +++++++++++++++++++++++-- + net/aws/tests/s3.rs | 279 ++++++++++++++++++++++++++++ + 2 files changed, 523 insertions(+), 12 deletions(-) + +diff --git a/net/aws/src/s3sink/multipartsink.rs b/net/aws/src/s3sink/multipartsink.rs +index c98353ea..83e0f85b 100644 +--- a/net/aws/src/s3sink/multipartsink.rs ++++ b/net/aws/src/s3sink/multipartsink.rs +@@ -58,7 +58,12 @@ const MAX_MULTIPART_NUMBER: i64 = 10000; + + struct Started { + client: Client, +- buffer: Vec, // the active part's buffer ++ // the active part's buffer ++ buffer: Vec, ++ // active buffer's "data size" represents the last offset of ++ // data that was written to the buffer prior to manually setting ++ // the buffer's len() during a seek operation. ++ buffer_data_size: usize, + upload_id: String, + part_number: i64, // the active part number + completed_parts: Vec, +@@ -79,6 +84,7 @@ impl Started { + Started { + client, + buffer, ++ buffer_data_size: 0, + upload_id, + part_number: 1, + completed_parts: Vec::new(), +@@ -606,6 +612,8 @@ pub struct S3Sink { + state: Mutex, + canceller: Mutex, + abort_multipart_canceller: Mutex, ++ eos_pending: Mutex, ++ write_will_cache_miss: Mutex, + } + + static CAT: Lazy = Lazy::new(|| { +@@ -670,9 +678,19 @@ impl S3Sink { + } + } + ++ /** ++ * Creates a part upload request using the current buffer (clearing it) ++ * and then sending the request (thus uploading the buffer). Upon ++ * successful completion, the completed part information is added to ++ * the state. ++ */ + fn flush_current_buffer(&self) -> Result<(), Option> { + let upload_part_req = self.create_upload_part_request()?; + ++ if upload_part_req.is_none() { ++ return Ok(()); // nothing to do here. ++ } ++ + let mut state = self.state.lock().unwrap(); + let state = match *state { + State::Started(ref mut started_state) => started_state, +@@ -684,7 +702,7 @@ impl S3Sink { + } + }; + +- let upload_part_req_future = upload_part_req.send(); ++ let upload_part_req_future = upload_part_req.unwrap().send(); + let output = + s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { + WaitError::FutureError(err) => { +@@ -701,19 +719,69 @@ impl S3Sink { + .set_e_tag(output.e_tag) + .set_part_number(Some(state.part_number as i32)) + .build(); +- state.completed_parts.push(completed_part); ++ ++ // Update or replace the completed part ++ match state ++ .completed_parts ++ .get_mut((state.part_number - 1) as usize) ++ { ++ Some(part) => *part = completed_part, ++ None => state.completed_parts.push(completed_part), ++ } ++ ++ // state.buffer_data_size is still set to the length of this most recently uploaded ++ // buffer, so increment the global write head position (upload_pos) with it. ++ state.upload_pos += state.buffer_data_size as u64; + + gst::info!(CAT, imp: self, "Uploaded part {}", state.part_number); + + // Increment part number + state.increment_part_number()?; + +- Ok(()) ++ // There are a few cases to check: ++ // 1. OK: N/A -> "new" part that will append the upload ++ // 2. OK: cache -> cached part ++ // 3. ERR: cache -> "new" part that will NOT append the upload (cache miss) ++ // ++ // If the cache.get() returns a record: ++ // 1. This current part number will NOT be appending the upload. ++ // 2. If the buffer is empty, it is a cache miss. ++ // Check if this part should come from cache ++ let eos_pending = self.eos_pending.lock().unwrap(); ++ match state.cache.get(state.part_number as usize) { ++ Some(part) => { ++ if part.buffer.is_empty() { ++ if false == *eos_pending { ++ // Cache miss (case 3) -- the part number was known but the buffer ++ // was not stored in the cache (because of the cache configuration). ++ *self.write_will_cache_miss.lock().unwrap() = true; ++ state.buffer_data_size = 0; ++ gst::debug!(CAT, imp:self, "Next write will cause a cache miss unless another seek is performed."); ++ } ++ } else { ++ // Cache hit (case 2) -- the part number was known and the buffer ++ // was a part of the cache ++ state.buffer = part.buffer.to_owned(); ++ state.buffer_data_size = part.data_size; ++ } ++ Ok(()) ++ } ++ None => { ++ // case 1 ++ state.buffer_data_size = 0; ++ Ok(()) ++ } ++ } + } + +- fn create_upload_part_request(&self) -> Result { ++ /** ++ * Creates the part upload request, moving the active buffer's contents ++ * into the request and then resetting the buffer. ++ */ ++ fn create_upload_part_request( ++ &self, ++ ) -> Result, gst::ErrorMessage> { + let url = self.url.lock().unwrap(); +- let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + let state = match *state { + State::Started(ref mut started_state) => started_state, +@@ -725,12 +793,28 @@ impl S3Sink { + } + }; + +- // Update the position within the upload +- state.upload_pos += state.buffer.len() as u64; ++ if state.buffer_data_size == 0 { ++ // Nothing to upload. ++ return Ok(None); ++ } + ++ // Update buffer len() to buffer_data_size, in the event the two ++ // became out of sync because of seeking. This ensures that the ++ // data size is stored correctly in the cache and copied fully ++ // into the request body. ++ unsafe { ++ state.buffer.set_len(state.buffer_data_size); ++ } ++ ++ // Update/append the part cache ++ state ++ .cache ++ .update_or_append(state.part_number as usize, &state.buffer); ++ ++ let capacity = state.buffer.capacity(); + let body = Some(ByteStream::from(std::mem::replace( + &mut state.buffer, +- Vec::with_capacity(settings.buffer_size as usize), ++ Vec::with_capacity(capacity), + ))); + + let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); +@@ -746,7 +830,7 @@ impl S3Sink { + .set_upload_id(upload_id) + .set_part_number(Some(state.part_number as i32)); + +- Ok(upload_part) ++ Ok(Some(upload_part)) + } + + fn create_complete_multipart_upload_request( +@@ -1001,6 +1085,11 @@ impl S3Sink { + Ok(()) + } + ++ /** ++ * Add 'src' to the buffer. If this exceeds the capacity of the buffer ++ * (i.e., the configured AWS max part size), the buffer is flushed (uploaded) ++ * and the remaining portion of src starts the next buffer (part). ++ */ + fn update_buffer(&self, src: &[u8]) -> Result<(), Option> { + let mut state = self.state.lock().unwrap(); + let started_state = match *state { +@@ -1018,8 +1107,18 @@ impl S3Sink { + src.len(), + ); + ++ if *self.write_will_cache_miss.lock().unwrap() && to_copy > 0 { ++ return Err(Some(gst::error_msg!( ++ gst::ResourceError::NotFound, ++ ["Cache miss on write has occurred"] ++ ))); ++ } ++ + let (head, tail) = src.split_at(to_copy); + started_state.buffer.extend_from_slice(head); ++ started_state.buffer_data_size = started_state ++ .buffer_data_size ++ .max(started_state.buffer.len()); + let do_flush = started_state.buffer.capacity() == started_state.buffer.len(); + drop(state); + +@@ -1027,7 +1126,7 @@ impl S3Sink { + self.flush_current_buffer()?; + } + +- if to_copy < src.len() { ++ if tail.len() > 0 { + self.update_buffer(tail)?; + } + +@@ -1065,6 +1164,109 @@ impl S3Sink { + )), + } + } ++ ++ /** ++ * Attempt to seek to some location within the overall upload. ++ * At this time, the only seeking that can happen is to places ++ * within the cached parts. If the seek is outside of the cached ++ * parts, the seek will fail. The cases to evaluate are: ++ * 1. new_offset == current position: ++ * return true (do nothing) ++ * 2. new_offset is within the current buffer: ++ * change the write head position on the buffer ++ * return true ++ * 3. new_offset is within the cache: ++ * flush ++ * switch to cached buffer, part number ++ * change the write head position on the buffer ++ */ ++ fn seek(self: &S3Sink, new_offset: u64) -> Result<(), Option> { ++ let mut state = self.state.lock().unwrap(); ++ let started_state = match *state { ++ State::Started(ref mut started_state) => started_state, ++ State::Completed => { ++ unreachable!("Upload should not be completed yet"); ++ } ++ State::Stopped => { ++ unreachable!("Element should be started"); ++ } ++ }; ++ ++ // Case 1: no-op. ++ if started_state.upload_pos == new_offset { ++ return Ok(()); ++ } ++ ++ // Determine if new_offset is within the current part or one in the cache. ++ let part_start = ++ (started_state.part_number as u64 - 1) * started_state.buffer.capacity() as u64; ++ let part_end = part_start + started_state.buffer_data_size as u64; ++ let part_limits = part_start..part_end; ++ ++ gst::trace!(CAT, imp: self, "Current part {} {part_limits:?} - seeking to {new_offset}", started_state.part_number); ++ ++ let mut next_part = 0; ++ let cache_result = started_state.cache.find(new_offset, &mut next_part); ++ ++ let offset_in_buffer = new_offset as usize % started_state.buffer.capacity(); ++ ++ if part_limits.contains(&new_offset) { ++ gst::trace!(CAT, imp: self, "Seeking to offset {} within current buffer", new_offset); ++ started_state.buffer_data_size = started_state ++ .buffer_data_size ++ .max(started_state.buffer.len()); ++ started_state.upload_pos = new_offset; ++ unsafe { ++ started_state.buffer.set_len(offset_in_buffer); ++ } ++ ++ return Ok(()); ++ } else if cache_result.is_ok() { ++ let result = cache_result.unwrap(); ++ let next_buffer = result.buffer.to_vec(); ++ let next_size = result.data_size.to_owned(); ++ ++ if 0 < next_buffer.len() { ++ // cache hit ++ drop(state); ++ self.flush_current_buffer()?; ++ ++ // Flushed okay, lock state again and update the buffer ++ let mut state = self.state.lock().unwrap(); ++ let started_state = match *state { ++ State::Started(ref mut started_state) => started_state, ++ _ => unreachable!("Element should still be started"), ++ }; ++ ++ // Clear the cache miss flag ++ *self.write_will_cache_miss.lock().unwrap() = false; ++ ++ gst::trace!(CAT, imp: self, "Seeking to offset {} within the cache (part {})", new_offset, next_part); ++ ++ started_state.part_number = next_part.try_into().unwrap(); ++ started_state.buffer = next_buffer; ++ started_state.buffer_data_size = next_size; ++ started_state.upload_pos = new_offset; ++ unsafe { ++ started_state.buffer.set_len(offset_in_buffer); ++ } ++ ++ return Ok(()); ++ } else { ++ // cache miss -- the part information is known but no data was stored. ++ return Err(Some(gst::error_msg!( ++ gst::StreamError::Failed, ++ ["Buffer not cached for part {next_part}"] ++ ))); ++ } ++ } else { ++ // Seek requested to an unsupported location. ++ return Err(Some(gst::error_msg!( ++ gst::StreamError::Failed, ++ ["Attempted to seek into unreachable region"] ++ ))); ++ } ++ } + } + + #[glib::object_subclass] +@@ -1079,6 +1281,8 @@ impl ObjectImpl for S3Sink { + fn constructed(&self) { + self.parent_constructed(); + ++ *self.eos_pending.lock().unwrap() = false; ++ *self.write_will_cache_miss.lock().unwrap() = false; + self.obj().set_sync(false); + } + +@@ -1260,7 +1464,11 @@ impl ObjectImpl for S3Sink { + } + } + "part-size" => { +- settings.buffer_size = value.get::().expect("type checked upstream").try_into().unwrap(); ++ settings.buffer_size = value ++ .get::() ++ .expect("type checked upstream") ++ .try_into() ++ .unwrap(); + } + "num-cached-parts" => { + settings.num_cached_parts = value.get::().expect("type checked upstream"); +@@ -1577,6 +1785,8 @@ impl BaseSinkImpl for S3Sink { + + fn event(&self, event: gst::Event) -> bool { + if let gst::EventView::Eos(_) = event.view() { ++ *self.eos_pending.lock().unwrap() = true; ++ *self.write_will_cache_miss.lock().unwrap() = false; + if let Err(error_message) = self.finalize_upload() { + gst::error!( + CAT, +@@ -1586,6 +1796,28 @@ impl BaseSinkImpl for S3Sink { + ); + return false; + } ++ } else if let gst::EventView::Segment(event) = event.view() { ++ let mut state = self.state.lock().unwrap(); ++ match *state { ++ State::Started(ref mut started_state) => started_state, ++ State::Completed => { ++ unreachable!("Upload should not be completed yet"); ++ } ++ State::Stopped => { ++ unreachable!("Element should be started already"); ++ } ++ }; ++ if event.segment().format() == gst::Format::Bytes { ++ let segment = event.segment(); ++ drop(state); ++ // value() is an i64, however the docs do not state what a negative bytes offset ++ // would imply since in practice it seems to always be an absolute offset from 0. ++ // If we get a negative number here, let it barf here. ++ if let Err(error_message) = self.seek(segment.start().value().try_into().unwrap()) { ++ gst::error!(CAT, imp: self, "Failed to seek: {:?}", error_message); ++ return false; ++ } ++ } + } + + BaseSinkImplExt::parent_event(self, event) +diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs +index 3998809f..6e2e6afb 100644 +--- a/net/aws/tests/s3.rs ++++ b/net/aws/tests/s3.rs +@@ -194,6 +194,285 @@ mod tests { + do_s3_multipart_test("s3 test").await; + } + ++ /** ++ * This test will run the sink with 1 part cached at the head (start) ++ * of the upload. It will push 12 packets (just over 2 parts), seek to ++ * a few bytes into the first part, write a change, then EOS. The ++ * expected result is the seek to use the cache, and the EOS to push the ++ * cached buffer and complete the upload. ++ */ ++ #[ignore = "failing, needs investigation"] ++ #[tokio::test] ++ async fn test_s3_multipart_head_cached() { ++ init(); ++ ++ let (region, bucket, key) = get_env_args("head_cached"); ++ let uri = get_uri(®ion, &bucket, &key); ++ let buffer_size = 1024 * 1024; ++ let buffers_per_part = 5; ++ let part_size = buffer_size * buffers_per_part; ++ let num_buffers = 12; ++ ++ let mut h1 = gst_check::Harness::new_parse(&format!( ++ "awss3sink name=\"sink\" uri=\"{uri}\" num-cached-parts=1 part-size={part_size}" ++ )); ++ h1.set_src_caps(gst::Caps::builder("text/plain").build()); ++ h1.play(); ++ ++ // Push stream start, segment, and buffers ++ let mut segment = gst::FormattedSegment::::new(); ++ h1.push_event(gst::event::StreamStart::builder(&"test-stream").build()); ++ h1.push_event(gst::event::Segment::new(&segment)); ++ for i in 1..=num_buffers as u8 { ++ let buffer = make_buffer(&vec![i; buffer_size]); ++ h1.push(buffer).unwrap(); ++ } ++ ++ // Try to seek into part 1 (end of first packet). ++ segment = gst::FormattedSegment::::new(); ++ segment.set_start(gst::format::Bytes::from_u64( ++ buffer_size.try_into().unwrap(), ++ )); ++ assert!(h1.push_event(gst::event::Segment::new(&segment))); ++ ++ // Overwrite second packet of part 1: [01...][AA...][03...]... ++ // This should succeed. ++ h1.push(make_buffer(&vec![0xAA; buffer_size])).unwrap(); ++ ++ // EOS to finish the upload ++ h1.push_event(gst::event::Eos::new()); ++ ++ // FIXME: This seems to return too early -- before the file is finalized ++ // at S3 -- because the h2.play() occasionally fails on GstState change ++ // to playing because the file is not found. ++ ++ // Download and verify contents ++ let mut h2 = gst_check::Harness::new("awss3src"); ++ h2.element().unwrap().set_property("uri", uri.clone()); ++ h2.play(); ++ ++ let mut location: usize = 0; ++ let mut expect = 0x00_u8; ++ loop { ++ match h2.pull() { ++ Ok(temp) => { ++ let buffer = temp.into_mapped_buffer_readable().unwrap(); ++ ++ if 0 == location % buffer_size { ++ expect += 1; ++ } ++ ++ for b in buffer.as_slice() { ++ if buffer_size <= location && location < 2 * buffer_size { ++ assert_eq!(b, &0xAA_u8); ++ } else { ++ assert_eq!(b, &expect); ++ } ++ location += 1; ++ } ++ } ++ Err(_) => break, ++ } ++ } ++ assert_eq!(location, num_buffers * buffer_size); ++ ++ delete_object(region.clone(), &bucket, &key).await; ++ } ++ ++ /** ++ * Pulls all buffers via harness.pull() until the 60-second timeout occurs, ++ * causing this method to return a buffer containing all of the others. ++ */ ++ fn pull_all_buffers_as_slice(harness: &mut gst_check::Harness) -> Vec { ++ let mut out: Vec = Vec::new(); ++ ++ loop { ++ match harness.pull() { ++ Ok(temp) => { ++ let buffer = temp.into_mapped_buffer_readable().unwrap(); ++ out.extend(buffer.as_slice()); ++ } ++ Err(_) => break, ++ } ++ } ++ ++ out ++ } ++ ++ /** ++ * This test verifies that a seek within the current part is permitted. ++ * Since the cache is not involved, subsequent writes beyond the previous "end" of ++ * of the buffer will simply continue as normal. ++ */ ++ #[ignore = "failing, needs investigation"] ++ #[tokio::test] ++ async fn test_s3_multipart_permit_seek_in_active_part() { ++ init(); ++ ++ let (region, bucket, key) = get_env_args("seek_active_part"); ++ let uri = get_uri(®ion, &bucket, &key); ++ let buffer_size = 1024 * 1024; ++ let buffers_per_part = 5; ++ let part_size = buffer_size * buffers_per_part; ++ let num_buffers = 2; ++ ++ let mut h1 = gst_check::Harness::new_parse(&format!( ++ "awss3sink name=\"sink\" uri=\"{uri}\" num-cached-parts=1 part-size={part_size}" ++ )); ++ h1.set_src_caps(gst::Caps::builder("text/plain").build()); ++ h1.play(); ++ ++ // Push stream start, segment, and buffers ++ let mut segment = gst::FormattedSegment::::new(); ++ h1.push_event(gst::event::StreamStart::builder(&"test-stream").build()); ++ h1.push_event(gst::event::Segment::new(&segment)); ++ for i in 1..=num_buffers as u8 { ++ let buffer = make_buffer(&vec![i; buffer_size]); ++ h1.push(buffer).unwrap(); ++ } ++ ++ // Seek back to the start of the second buffer, which is still the active part. ++ segment = gst::FormattedSegment::::new(); ++ segment.set_start(gst::format::Bytes::from_u64( ++ buffer_size.try_into().unwrap(), ++ )); ++ assert!(h1.push_event(gst::event::Segment::new(&segment))); ++ ++ // Overwrite that buffer, which should succeed. ++ // This should succeed. ++ h1.push(make_buffer(&vec![0xAA; buffer_size])).unwrap(); ++ ++ // EOS to finish the upload ++ h1.push_event(gst::event::Eos::new()); ++ ++ // Verify ++ let mut h2 = gst_check::Harness::new("awss3src"); ++ h2.element().unwrap().set_property("uri", uri.clone()); ++ h2.play(); ++ ++ let buffer = pull_all_buffers_as_slice(&mut h2); ++ assert_eq!(buffer.len(), buffer_size * 2); ++ assert!(buffer[0..(buffer_size - 1)].iter().all(|&b| b == 0x01_u8)); ++ assert!(buffer[buffer_size..].iter().all(|&b| b == 0xAA_u8)); ++ ++ delete_object(region.clone(), &bucket, &key).await; ++ } ++ ++ /** ++ * This test verifies the behavior where a seek back into the cache is successful, ++ * but the continued write goes off the edge of the cache. Since this implementation ++ * does not attempt to do any download/re-upload of previous parts, the expected result ++ * is a flow error when the boundary is crossed. ++ * ++ * The test configures 1 part head cache, writes two parts, seeks into the tail of the ++ * first part and writes a buffer that would cross the part boundary (cache miss). ++ * The expected result is a flow error. ++ */ ++ #[ignore = "failing, needs investigation"] ++ #[tokio::test] ++ async fn test_s3_multipart_cache_miss_on_write() { ++ init(); ++ ++ let (region, bucket, key) = get_env_args("cache_miss_on_write"); ++ let uri = get_uri(®ion, &bucket, &key); ++ let buffer_size = 1024 * 1024; ++ let buffers_per_part = 5; ++ let part_size = buffer_size * buffers_per_part; ++ let num_buffers = 6; ++ ++ let mut h1 = gst_check::Harness::new_parse(&format!( ++ "awss3sink name=\"sink\" uri=\"{uri}\" num-cached-parts=1 part-size={part_size}" ++ )); ++ h1.set_src_caps(gst::Caps::builder("text/plain").build()); ++ h1.play(); ++ ++ // Push stream start, segment, and buffers ++ let mut segment = gst::FormattedSegment::::new(); ++ h1.push_event(gst::event::StreamStart::builder(&"test-stream").build()); ++ h1.push_event(gst::event::Segment::new(&segment)); ++ for i in 1..=num_buffers as u8 { ++ let buffer = make_buffer(&vec![i; buffer_size]); ++ h1.push(buffer).unwrap(); ++ } ++ ++ // Seek to near the end of part 1 (half a buffer from tail) ++ segment = gst::FormattedSegment::::new(); ++ segment.set_start(gst::format::Bytes::from_u64( ++ (part_size - (buffer_size / 2)).try_into().unwrap(), ++ )); ++ assert!(h1.push_event(gst::event::Segment::new(&segment))); ++ ++ // Write a full buffer. The first half of the write will succeed internally but the continued ++ // write will trigger a flow error response since the cache boundary will be crossed. ++ assert!(h1.push(make_buffer(&vec![0xAA; buffer_size])).is_err()); ++ } ++ ++ /** ++ * This test caches the first part. Two parts are written, then a seek into the tail of ++ * the first part. The subsequent write reaches the end of part 1, flagging a potential ++ * cache miss is imminent if more data is written. However, an EOS event is received ++ * which successfully finalizes the file and does not result in a flow error. ++ */ ++ #[ignore = "failing, needs investigation"] ++ #[tokio::test] ++ async fn test_s3_multipart_eos_on_cache_boundary() { ++ init(); ++ ++ let (region, bucket, key) = get_env_args("eos_on_cache_boundary"); ++ let uri = get_uri(®ion, &bucket, &key); ++ let buffer_size = 1024 * 1024; ++ let buffers_per_part = 5; ++ let part_size = buffer_size * buffers_per_part; ++ let num_buffers = 6; ++ ++ let mut h1 = gst_check::Harness::new_parse(&format!( ++ "awss3sink name=\"sink\" uri=\"{uri}\" num-cached-parts=1 part-size={part_size}" ++ )); ++ h1.set_src_caps(gst::Caps::builder("text/plain").build()); ++ h1.play(); ++ ++ // Push stream start, segment, and buffers ++ let mut segment = gst::FormattedSegment::::new(); ++ h1.push_event(gst::event::StreamStart::builder(&"test-stream").build()); ++ h1.push_event(gst::event::Segment::new(&segment)); ++ for i in 1..=num_buffers as u8 { ++ let buffer = make_buffer(&vec![i; buffer_size]); ++ h1.push(buffer).unwrap(); ++ } ++ ++ // Seek to the end of part 1 (a buffer from tail) ++ segment = gst::FormattedSegment::::new(); ++ segment.set_start(gst::format::Bytes::from_u64( ++ (part_size - buffer_size).try_into().unwrap(), ++ )); ++ assert!(h1.push_event(gst::event::Segment::new(&segment))); ++ ++ // Write a full buffer. The write should succeed because it only reaches the cache boundary. ++ assert!(h1.push(make_buffer(&vec![0xAA; buffer_size])).is_ok()); ++ ++ // EOS should be allowed and result in a finalized file. ++ h1.push_event(gst::event::Eos::new()); ++ ++ // Verify ++ let mut h2 = gst_check::Harness::new("awss3src"); ++ h2.element().unwrap().set_property("uri", uri.clone()); ++ h2.play(); ++ ++ let buffer = pull_all_buffers_as_slice(&mut h2); ++ assert_eq!(buffer.len(), buffer_size * num_buffers); ++ assert!(buffer[buffer_size * 3..(buffer_size * 4 - 1)] ++ .iter() ++ .all(|&b| b == 0x04_u8)); ++ assert!(buffer[buffer_size * 4..(buffer_size * 5 - 1)] ++ .iter() ++ .all(|&b| b == 0xAA_u8)); ++ assert!(buffer[buffer_size * 5..(buffer_size * 6 - 1)] ++ .iter() ++ .all(|&b| b == 0x06_u8)); ++ ++ delete_object(region.clone(), &bucket, &key).await; ++ } ++ + #[ignore = "failing, needs investigation"] + #[tokio::test] + async fn test_s3_multipart_unicode() { +-- +2.34.1 +