From 2c732835cb813545d36b049c268102effc92544f Mon Sep 17 00:00:00 2001 From: Alistair Israel Date: Sun, 15 Mar 2026 10:10:23 -0400 Subject: [PATCH 1/2] refactor(head): use build_reader and apply_select_and_display like tail/sample Made-with: Cursor --- src/bin/datu/commands/head.rs | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/bin/datu/commands/head.rs b/src/bin/datu/commands/head.rs index 932ef83..5e56a82 100644 --- a/src/bin/datu/commands/head.rs +++ b/src/bin/datu/commands/head.rs @@ -1,28 +1,33 @@ use anyhow::Result; +use anyhow::bail; +use datu::FileType; use datu::cli::HeadsOrTails; -use datu::pipeline::RecordBatchReaderSource; -use datu::pipeline::Step; -use datu::pipeline::VecRecordBatchReaderSource; -use datu::pipeline::display::DisplayWriterStep; -use datu::pipeline::read_to_batches; +use datu::pipeline::build_reader; +use datu::pipeline::display::apply_select_and_display; +use datu::pipeline::record_batch_filter::parse_select_step; use datu::resolve_input_file_type; /// head command implementation: print the first N lines of an Avro, CSV, Parquet, or ORC file. pub async fn head(args: HeadsOrTails) -> Result<()> { let input_file_type = resolve_input_file_type(args.input, &args.input_path)?; - let batches = read_to_batches( + match input_file_type { + FileType::Parquet | FileType::Avro | FileType::Csv | FileType::Orc => {} + _ => bail!("Only Parquet, Avro, CSV, and ORC are supported for head"), + } + let reader_step = build_reader( &args.input_path, input_file_type, - &args.select, Some(args.number), + None, args.input_headers, + )?; + apply_select_and_display( + reader_step, + parse_select_step(&args.select), + args.output, + args.sparse, + args.output_headers.unwrap_or(true), ) - .await?; - let reader_step: RecordBatchReaderSource = Box::new(VecRecordBatchReaderSource::new(batches)); - let display_step = DisplayWriterStep { - output_format: args.output, - sparse: args.sparse, - headers: args.output_headers.unwrap_or(true), - }; - display_step.execute(reader_step).await.map_err(Into::into) + .await + .map_err(Into::into) } From 23da2db1c29aab227591ec57662bb4b1ae198e35 Mon Sep 17 00:00:00 2001 From: Alistair Israel Date: Sun, 15 Mar 2026 10:27:58 -0400 Subject: [PATCH 2/2] Enhance CSV and ORC reading capabilities by adding row limit support - Updated `ReadCsvStep` to include an optional `limit` parameter for controlling the maximum number of rows read. - Modified `build_reader` function to pass the `limit` when reading CSV files. - Adjusted `head` command to set an offset of 0 when limiting rows for ORC files, ensuring correct row selection. - Refactored `select_columns_to_batches` to improve column selection logic and handle empty specifications more gracefully. --- src/bin/datu/commands/head.rs | 3 ++- src/cli/repl.rs | 2 +- src/pipeline.rs | 1 + src/pipeline/csv.rs | 18 ++++++++++++++++- src/pipeline/select.rs | 38 +++++++++++++++++------------------ 5 files changed, 40 insertions(+), 22 deletions(-) diff --git a/src/bin/datu/commands/head.rs b/src/bin/datu/commands/head.rs index 5e56a82..e00f8b2 100644 --- a/src/bin/datu/commands/head.rs +++ b/src/bin/datu/commands/head.rs @@ -14,11 +14,12 @@ pub async fn head(args: HeadsOrTails) -> Result<()> { FileType::Parquet | FileType::Avro | FileType::Csv | FileType::Orc => {} _ => bail!("Only Parquet, Avro, CSV, and ORC are supported for head"), } + // Pass offset=0 when limiting so ORC row selection applies (it requires both offset and limit). let reader_step = build_reader( &args.input_path, input_file_type, Some(args.number), - None, + Some(0), args.input_headers, )?; apply_select_and_display( diff --git a/src/cli/repl.rs b/src/cli/repl.rs index 6095028..58418ff 100644 --- a/src/cli/repl.rs +++ b/src/cli/repl.rs @@ -219,7 +219,7 @@ impl ReplPipelineBuilder { let batches = self.batches.take().ok_or_else(|| { Error::GenericError("select requires a preceding read in the pipe".to_string()) })?; - let selected = select::select_columns_to_batches(batches, columns).await?; + let selected = select::select_columns_to_batches(batches, columns)?; self.batches = Some(selected); Ok(()) } diff --git a/src/pipeline.rs b/src/pipeline.rs index 62ecc3f..63969c4 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -140,6 +140,7 @@ pub fn build_reader( FileType::Csv => Box::new(ReadCsvStep { path: path.to_string(), has_header: csv_has_header, + limit, }), FileType::Orc => Box::new(ReadOrcStep { args: ReadArgs { diff --git a/src/pipeline/csv.rs b/src/pipeline/csv.rs index cd24841..e1dd47b 100644 --- a/src/pipeline/csv.rs +++ b/src/pipeline/csv.rs @@ -18,6 +18,8 @@ use crate::pipeline::batch_write::write_record_batches_with_sink; pub struct ReadCsvStep { pub path: String, pub has_header: Option, + /// Maximum number of rows to read. None means read all. + pub limit: Option, } impl Source for ReadCsvStep { @@ -30,12 +32,26 @@ impl Source for ReadCsvStep { }) .map_err(|e| Error::GenericError(e.to_string()))?; - let batches = tokio::task::block_in_place(|| { + let mut batches = tokio::task::block_in_place(|| { let handle = tokio::runtime::Handle::current(); handle.block_on(df.collect()) }) .map_err(|e| Error::GenericError(e.to_string()))?; + if let Some(limit) = self.limit { + let mut result = Vec::new(); + let mut remaining = limit; + for batch in batches { + if remaining == 0 { + break; + } + let rows = batch.num_rows().min(remaining); + result.push(batch.slice(0, rows)); + remaining -= rows; + } + batches = result; + } + Ok(Box::new(VecRecordBatchReader::new(batches))) } } diff --git a/src/pipeline/select.rs b/src/pipeline/select.rs index bb3eeb6..e23e6dc 100644 --- a/src/pipeline/select.rs +++ b/src/pipeline/select.rs @@ -1,4 +1,5 @@ -//! DataFusion DataFrame API for column selection. +//! Column selection: ColumnSpec resolution (shared by CLI and REPL), in-memory projection +//! (Arrow), and DataFusion-based read helpers. use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; @@ -103,30 +104,29 @@ pub async fn read_avro_select( Ok(Box::new(VecRecordBatchReaderSource::new(batches))) } -/// Applies column selection to record batches using the DataFusion DataFrame API. -/// Returns the selected batches directly (for use when RecordBatchReaderSource is not needed). -/// Resolves ColumnSpec against the schema: Exact uses case-sensitive match, CaseInsensitive uses -/// case-insensitive match. -pub async fn select_columns_to_batches( +/// Applies column selection to record batches using the same resolution and projection +/// as the streaming SelectColumnsStep: resolve_column_specs then Arrow project by indices. +pub fn select_columns_to_batches( batches: Vec, specs: &[ColumnSpec], ) -> crate::Result> { - if batches.is_empty() { + if batches.is_empty() || specs.is_empty() { return Ok(batches); } let schema = batches[0].schema(); - let columns = resolve_column_specs(&schema, specs)?; - let ctx = SessionContext::new(); - let col_refs: Vec<&str> = columns.iter().map(String::as_str).collect(); - let df = ctx - .read_batches(batches) - .map_err(|e| crate::Error::GenericError(e.to_string()))?; - let df = df - .select_columns(&col_refs) - .map_err(|e| crate::Error::GenericError(e.to_string()))?; - df.collect() - .await - .map_err(|e| crate::Error::GenericError(e.to_string())) + let column_names = resolve_column_specs(schema.as_ref(), specs)?; + let indices: Vec = column_names + .iter() + .map(|col| { + schema + .index_of(col) + .map_err(|e| crate::Error::GenericError(format!("Column '{col}' not found: {e}"))) + }) + .collect::>>()?; + batches + .into_iter() + .map(|batch| batch.project(&indices).map_err(crate::Error::from)) + .collect() } /// Applies column selection to record batches using the DataFusion DataFrame API.