Skip to content

Commit c188abc

Browse files
committed
feat: update boundary handling to use u64 for sizes and add error handling for zero scan window
1 parent 9e1c07d commit c188abc

2 files changed

Lines changed: 52 additions & 18 deletions

File tree

datafusion/datasource-json/src/boundary_utils.rs

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use datafusion_common::{DataFusionError, Result};
2020
use object_store::{ObjectStore, path::Path};
2121
use std::sync::Arc;
2222

23-
pub const DEFAULT_BOUNDARY_WINDOW: usize = 4096; // 4KB
23+
pub const DEFAULT_BOUNDARY_WINDOW: u64 = 4096; // 4KB
2424

2525
/// Fetch bytes for [start, end) and align boundaries in memory.
2626
///
@@ -36,20 +36,26 @@ pub const DEFAULT_BOUNDARY_WINDOW: usize = 4096; // 4KB
3636
pub async fn get_aligned_bytes(
3737
store: &Arc<dyn ObjectStore>,
3838
location: &Path,
39-
start: usize,
40-
end: usize,
41-
file_size: usize,
39+
start: u64,
40+
end: u64,
41+
file_size: u64,
4242
terminator: u8,
43-
scan_window: usize,
43+
scan_window: u64,
4444
) -> Result<Option<Bytes>> {
45+
if scan_window == 0 {
46+
return Err(DataFusionError::Internal(
47+
"scan_window must be greater than 0".to_string(),
48+
));
49+
}
50+
4551
if start >= end || start >= file_size {
4652
return Ok(None);
4753
}
4854

4955
let fetch_start = start.saturating_sub(1);
5056
let fetch_end = std::cmp::min(end, file_size);
5157
let bytes = store
52-
.get_range(location, (fetch_start as u64)..(fetch_end as u64))
58+
.get_range(location, fetch_start..fetch_end)
5359
.await
5460
.map_err(|e| DataFusionError::External(Box::new(e)))?;
5561

@@ -80,12 +86,15 @@ pub async fn get_aligned_bytes(
8086
}
8187

8288
// Slow path: need to extend, preallocate capacity
83-
let mut buffer = Vec::with_capacity(data.len() + scan_window);
89+
let scan_window_usize = usize::try_from(scan_window).map_err(|_| {
90+
DataFusionError::Internal("scan_window must fit in usize".to_string())
91+
})?;
92+
let mut buffer = Vec::with_capacity(data.len().saturating_add(scan_window_usize));
8493
buffer.extend_from_slice(&data);
85-
let mut cursor = fetch_end as u64;
94+
let mut cursor = fetch_end;
8695

87-
while cursor < file_size as u64 {
88-
let chunk_end = std::cmp::min(cursor + scan_window as u64, file_size as u64);
96+
while cursor < file_size {
97+
let chunk_end = std::cmp::min(cursor.saturating_add(scan_window), file_size);
8998
let chunk = store
9099
.get_range(location, cursor..chunk_end)
91100
.await
@@ -222,6 +231,23 @@ mod tests {
222231
assert_eq!(result.unwrap().as_ref(), b"line1");
223232
}
224233

234+
#[tokio::test]
235+
async fn test_get_aligned_bytes_rejects_zero_scan_window() {
236+
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
237+
let path = Path::from("test.json");
238+
239+
store.put(&path, "line1\n".into()).await.unwrap();
240+
241+
let err = get_aligned_bytes(&store, &path, 0, 6, 6, b'\n', 0)
242+
.await
243+
.unwrap_err();
244+
245+
assert!(
246+
matches!(err, DataFusionError::Internal(ref msg) if msg.contains("scan_window")),
247+
"unexpected error: {err}"
248+
);
249+
}
250+
225251
#[derive(Debug)]
226252
struct CountingObjectStore {
227253
inner: Arc<dyn ObjectStore>,
@@ -338,10 +364,10 @@ mod tests {
338364
let path = Path::from("amplification.json");
339365

340366
let data = build_fixed_lines(128, 16_384);
341-
let file_size = data.len();
367+
let file_size = data.len() as u64;
342368
inner.put(&path, data.into()).await.unwrap();
343369

344-
let start = 1_000_003usize;
370+
let start = 1_000_003u64;
345371
let raw_end = start + 64_000;
346372
let end = (raw_end / 128).max(1) * 128;
347373

@@ -350,8 +376,8 @@ mod tests {
350376
object_meta,
351377
partition_values: vec![],
352378
range: Some(FileRange {
353-
start: start as i64,
354-
end: end as i64,
379+
start: i64::try_from(start).unwrap(),
380+
end: i64::try_from(end).unwrap(),
355381
}),
356382
statistics: None,
357383
ordering: None,

datafusion/datasource-json/src/source.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl FileOpener for JsonOpener {
187187
let file_compression_type = self.file_compression_type.to_owned();
188188

189189
Ok(Box::pin(async move {
190-
let file_size = partitioned_file.object_meta.size as usize;
190+
let file_size = partitioned_file.object_meta.size;
191191
let location = &partitioned_file.object_meta.location;
192192

193193
let file_range = if file_compression_type.is_compressed() {
@@ -197,8 +197,16 @@ impl FileOpener for JsonOpener {
197197
};
198198

199199
if let Some(file_range) = file_range.as_ref() {
200-
let raw_start = file_range.start as usize;
201-
let raw_end = file_range.end as usize;
200+
let raw_start = u64::try_from(file_range.start).map_err(|_| {
201+
DataFusionError::Internal(
202+
"file range start must be non-negative".to_string(),
203+
)
204+
})?;
205+
let raw_end = u64::try_from(file_range.end).map_err(|_| {
206+
DataFusionError::Internal(
207+
"file range end must be non-negative".to_string(),
208+
)
209+
})?;
202210
let aligned_bytes = get_aligned_bytes(
203211
&store,
204212
location,
@@ -245,7 +253,7 @@ impl FileOpener for JsonOpener {
245253
Some(_) => {
246254
file.seek(SeekFrom::Start(result.range.start as _))?;
247255
let limit = result.range.end - result.range.start;
248-
file_compression_type.convert_read(file.take(limit as u64))?
256+
file_compression_type.convert_read(file.take(limit))?
249257
}
250258
};
251259

0 commit comments

Comments
 (0)